Запись искры DataFrame на s3? - PullRequest
0 голосов
/ 06 мая 2019

Я использую кластер EMR, на котором установлен spark.2.4.0.

Я перетаскиваю данные из SQL Server в фрейм данных spark, используя spark sql, вот код для него.

from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext, SparkSession, SQLContext
from pyspark.sql.functions import lit
import datetime, calendar, time
import logging
import argparse
import func
import sys


conf = SparkConf().setAppName('test emr')

"""
Load tables from SQL Server 
"""
env = 'prod'
load = 'test load'
sc = func.initiate_sc('load_test')
func.sc = sc
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sqlContext = SQLContext(sc)
logger =  func.set_logger('test')
func.logger = logger

host = 'xxxx.xxxxx.ca-central-1.rds.amazonaws.com'
port = xxxx
dbuser = 'xxxx'
dbpass = 'xxxx'
dbname = 'xxxxx'
jdbc_url = 'jdbc:sqlserver://{host}:{port};database={db};user={user};password={pwd}'.format(host=host, port=port, db=dbname, user=dbuser, pwd=dbpass)


# Read from SQL Server
def main():
    test_query="(select * from dbo.Company_Company)  var"
    try:
        logger.info("inside the try block")
        df = sqlContext.read.jdbc(url=jdbc_url, table=test_query)
        df.write.save("s3://test/staging/Company")
        logger.info("test success count is {}".format(df.count()))
    except:
        logger.warning("test failed")
        logger.exception("message")
    logger.info("Test Completed")

if __name__ == '__main__':
    logger.info("Connecting to SQL Server...")
    main()
    sc.stop()
    logger.info("Exiting.")
    sys.exit()

Я могу вытащить данные в фрейм данных 'df'но я не могу записать его в ведро s3, которое выдает следующую ошибку.

Traceback (most recent call last):
  File "/home/hadoop/saurabh_temp/test.py", line 42, in main
    df.write.save("s3://test/staging/Company")
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 736, in save
    self._jwrite.save(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o75.save.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2369)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2840)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2857)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2896)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2878)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:392)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
    at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:424)
    at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:524)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:281)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2273)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367)
    ... 23 more

Но когда я запускаю свою оболочку pyspark и выполняю все инструкции шаг за шагом, как

Using Python version 2.7.15 (default, Nov 28 2018 22:38:08)
SparkSession available as 'spark'.
>>> host = 'xxxx.xxxx.ca-central-1.rds.amazonaws.com'
>>> port = xxxx
>>> dbuser = 'xxxx'
>>> dbpass = 'xxxxx'
>>> dbname = 'xxxxxx'
>>> jdbc_url = 'jdbc:sqlserver://{host}:{port};database={db};user={user};password={pwd}'.format(host=host, port=port, db=dbname, user=dbuser, pwd=dbpass)
>>> 
>>> 
>>> 
>>> test_query="(select * from dbo.Company_Company)  var"
>>> df = sqlContext.read.jdbc(url=jdbc_url, table=test_query)
>>> df.write.save("s3://test/staging/Company")
>>> exit()                   

, я могу записать свой фрейм данных в s3.Поэтому я не могу увидеть, что я делаю неправильно в своем коде, который я не могу написать в s3.

...