Я использую кластер EMR, на котором установлен spark.2.4.0.
Я перетаскиваю данные из SQL Server в фрейм данных spark, используя spark sql, вот код для него.
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext, SparkSession, SQLContext
from pyspark.sql.functions import lit
import datetime, calendar, time
import logging
import argparse
import func
import sys
conf = SparkConf().setAppName('test emr')
"""
Load tables from SQL Server
"""
env = 'prod'
load = 'test load'
sc = func.initiate_sc('load_test')
func.sc = sc
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sqlContext = SQLContext(sc)
logger = func.set_logger('test')
func.logger = logger
host = 'xxxx.xxxxx.ca-central-1.rds.amazonaws.com'
port = xxxx
dbuser = 'xxxx'
dbpass = 'xxxx'
dbname = 'xxxxx'
jdbc_url = 'jdbc:sqlserver://{host}:{port};database={db};user={user};password={pwd}'.format(host=host, port=port, db=dbname, user=dbuser, pwd=dbpass)
# Read from SQL Server
def main():
test_query="(select * from dbo.Company_Company) var"
try:
logger.info("inside the try block")
df = sqlContext.read.jdbc(url=jdbc_url, table=test_query)
df.write.save("s3://test/staging/Company")
logger.info("test success count is {}".format(df.count()))
except:
logger.warning("test failed")
logger.exception("message")
logger.info("Test Completed")
if __name__ == '__main__':
logger.info("Connecting to SQL Server...")
main()
sc.stop()
logger.info("Exiting.")
sys.exit()
Я могу вытащить данные в фрейм данных 'df'но я не могу записать его в ведро s3, которое выдает следующую ошибку.
Traceback (most recent call last):
File "/home/hadoop/saurabh_temp/test.py", line 42, in main
df.write.save("s3://test/staging/Company")
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 736, in save
self._jwrite.save(path)
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o75.save.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2369)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2840)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2857)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2896)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2878)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:392)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:424)
at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:524)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:281)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2273)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367)
... 23 more
Но когда я запускаю свою оболочку pyspark и выполняю все инструкции шаг за шагом, как
Using Python version 2.7.15 (default, Nov 28 2018 22:38:08)
SparkSession available as 'spark'.
>>> host = 'xxxx.xxxx.ca-central-1.rds.amazonaws.com'
>>> port = xxxx
>>> dbuser = 'xxxx'
>>> dbpass = 'xxxxx'
>>> dbname = 'xxxxxx'
>>> jdbc_url = 'jdbc:sqlserver://{host}:{port};database={db};user={user};password={pwd}'.format(host=host, port=port, db=dbname, user=dbuser, pwd=dbpass)
>>>
>>>
>>>
>>> test_query="(select * from dbo.Company_Company) var"
>>> df = sqlContext.read.jdbc(url=jdbc_url, table=test_query)
>>> df.write.save("s3://test/staging/Company")
>>> exit()
, я могу записать свой фрейм данных в s3.Поэтому я не могу увидеть, что я делаю неправильно в своем коде, который я не могу написать в s3.