Я использую пользовательский источник данных, расположенный здесь https://github.com/sourav-mazumder/Data-Science-Extensions/releases
Когда я работаю с этим локально, используя среду Dockerized Spark, она работает как положено.Однако, когда я использую его в EMR, я получаю ошибку в названии этого вопроса.Ниже приведены параметры конфигурации EMR, сообщения о запуске Spark, тестовый код и результат при его запуске.Я не уверен, что еще мне нужно настроить.
Конфигурация:
[
{
"configurations": [
{
"classification": "export",
"properties": {
"PYSPARK_PYTHON": "/usr/bin/python3"
}
}
],
"classification": "spark-env",
"properties": {}
},
{
"configurations": [
{
"classification": "export",
"properties": {
"PYSPARK_PYTHON": "/usr/bin/python3"
}
}
],
"classification": "yarn-env",
"properties": {}
},
{
"classification": "spark-defaults",
"properties": {
"spark.executor.extraClassPath": "/home/hadoop/*",
"spark.driver.extraClassPath": "/home/hadoop/*",
"spark.jars.packages": "org.scalaj:scalaj-http_2.10:2.3.0"
}
}
]
Есть также загрузкаШаг для копирования JAR источника данных, доступного по ссылке выше, которую я загрузил на S3, на каждый узел в кластере искр в /home/hadoop
:
aws s3 cp s3://foo/spark-datasource-rest_2.11-2.1.0-SNAPSHOT.jar /home/hadoop/spark-datasource-rest_2.11-2.1.0-SNAPSHOT.jar
Когда я SSH в главный узел и запускаюСеанс PySpark (как ec2-user
) Я вижу следующие сообщения:
sudo pyspark
Python 3.6.7 (default, Dec 21 2018, 20:31:01)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /home/ec2-user/.ivy2/cache
The jars for the packages stored in: /home/ec2-user/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.scalaj#scalaj-http_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-200abadc-d4a6-47dd-a2e9-110f77de3b4e;1.0
confs: [default]
found org.scalaj#scalaj-http_2.10;2.3.0 in central
:: resolution report :: resolve 140ms :: artifacts dl 5ms
:: modules in use:
org.scalaj#scalaj-http_2.10;2.3.0 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 1 | 0 | 0 | 0 || 1 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-200abadc-d4a6-47dd-a2e9-110f77de3b4e
confs: [default]
0 artifacts copied, 1 already retrieved (0kB/6ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/04/11 19:00:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/04/11 19:00:36 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
19/04/11 19:00:40 WARN Client: Same path resource file:///home/ec2-user/.ivy2/jars/org.scalaj_scalaj-http_2.10-2.3.0.jar added multiple times to distributed cache.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.6.7 (default, Dec 21 2018 20:31:01)
SparkSession available as 'spark'.
Затем я запускаю свой тестовый код:
sodauri = 'https://soda.demo.socrata.com/resource/6yvf-kk3n.json'
sodainput1 = [("Nevada"), ("nn")]
sodainput2 = [("Northern California"), ("pr")]
sodainput3 = [("Virgin Islands region"), ("pr")]
sodainputRdd = spark.sparkContext.parallelize([sodainput1, sodainput2, sodainput3])
sodaDf = sodainputRdd.toDF(["region","source"])
sodaDf.createOrReplaceTempView('sodainputtbl')
prmsSoda = { 'url' : sodauri, 'input' : 'sodainputtbl', 'method' : 'GET', 'readTimeout' : '10000', 'connectionTimeout' : '2000', 'partitions' : '10'}
sodasDf = spark.read.format("org.apache.dsext.spark.datasource.rest.RestDataSource").options(**prmsSoda).load()
[Stage 6:> (0 + 1) / 1]19/04/11 20:34:39 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 8, ip-10-100-14-225.us-west-2.compute.internal, executor 3): java.lang.NoClassDefFoundError: scalaj/http/Http$
at org.apache.dsext.spark.datasource.rest.RestConnectorUtil$.callRestAPI(RestConnectorUtil.scala:53)
at org.apache.dsext.spark.datasource.rest.RESTRelation.org$apache$dsext$spark$datasource$rest$RESTRelation$$callRest(RestRelation.scala:128)
at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: scalaj.http.Http$
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 31 more
19/04/11 20:34:39 ERROR TaskSetManager: Task 0 in stage 6.0 failed 4 times; aborting job
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 172, in load
return self._df(self._jreader.load())
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o193.load.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 11, ip-10-100-14-225.us-west-2.compute.internal, executor 3): java.lang.NoClassDefFoundError: scalaj/http/Http$
at org.apache.dsext.spark.datasource.rest.RestConnectorUtil$.callRestAPI(RestConnectorUtil.scala:53)
at org.apache.dsext.spark.datasource.rest.RESTRelation.org$apache$dsext$spark$datasource$rest$RESTRelation$$callRest(RestRelation.scala:128)
at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$.infer(JsonInferSchema.scala:83)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.inferFromDataset(JsonDataSource.scala:108)
at org.apache.spark.sql.DataFrameReader$$anonfun$2.apply(DataFrameReader.scala:439)
at org.apache.spark.sql.DataFrameReader$$anonfun$2.apply(DataFrameReader.scala:439)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:438)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:419)
at org.apache.dsext.spark.datasource.rest.RESTRelation.<init>(RestRelation.scala:101)
at org.apache.dsext.spark.datasource.rest.RestDataSource.createRelation(RestDataSource.scala:42)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: scalaj/http/Http$
at org.apache.dsext.spark.datasource.rest.RestConnectorUtil$.callRestAPI(RestConnectorUtil.scala:53)
at org.apache.dsext.spark.datasource.rest.RESTRelation.org$apache$dsext$spark$datasource$rest$RESTRelation$$callRest(RestRelation.scala:128)
at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
>>>
Что не так с моей конфигурацией?
Обновление
Я попытался настроить кластер EMR с scalaj-http_2.11
, но я все еще получаю ту же ошибку.Я также перестроил плагин, убедившись, что он создан для версии 2.11.12 Scala и Spark 2.4, такой же, как те, которые работают на EMR.Я все еще получаю ту же ошибку.
Теперь я просматриваю предупреждающие сообщения на случай, если это проблема:
Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Same path resource file:///home/ec2-user/.ivy2/jars/org.scalaj_scalaj-http_2.10-2.3.0.jar added multiple times to distributed cache.