Пользовательский плагин источника данных при выдаче EMR java.lang.NoClassDefFoundError: scalaj / http / Http - PullRequest
2 голосов
/ 11 апреля 2019

Я использую пользовательский источник данных, расположенный здесь https://github.com/sourav-mazumder/Data-Science-Extensions/releases

Когда я работаю с этим локально, используя среду Dockerized Spark, она работает как положено.Однако, когда я использую его в EMR, я получаю ошибку в названии этого вопроса.Ниже приведены параметры конфигурации EMR, сообщения о запуске Spark, тестовый код и результат при его запуске.Я не уверен, что еще мне нужно настроить.

aws emr console configuration

Конфигурация:

[
  {
    "configurations": [
      {
        "classification": "export",
        "properties": {
          "PYSPARK_PYTHON": "/usr/bin/python3"
        }
      }
    ],
    "classification": "spark-env",
    "properties": {}
  },
  {
    "configurations": [
      {
        "classification": "export",
        "properties": {
          "PYSPARK_PYTHON": "/usr/bin/python3"
        }
      }
    ],
    "classification": "yarn-env",
    "properties": {}
  },
  {
    "classification": "spark-defaults",
    "properties": {
      "spark.executor.extraClassPath": "/home/hadoop/*",
      "spark.driver.extraClassPath": "/home/hadoop/*",
      "spark.jars.packages": "org.scalaj:scalaj-http_2.10:2.3.0"
    }
  }
]

Есть также загрузкаШаг для копирования JAR источника данных, доступного по ссылке выше, которую я загрузил на S3, на каждый узел в кластере искр в /home/hadoop:

aws s3 cp s3://foo/spark-datasource-rest_2.11-2.1.0-SNAPSHOT.jar /home/hadoop/spark-datasource-rest_2.11-2.1.0-SNAPSHOT.jar

Когда я SSH в главный узел и запускаюСеанс PySpark (как ec2-user) Я вижу следующие сообщения:

sudo pyspark 
Python 3.6.7 (default, Dec 21 2018, 20:31:01) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /home/ec2-user/.ivy2/cache
The jars for the packages stored in: /home/ec2-user/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.scalaj#scalaj-http_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-200abadc-d4a6-47dd-a2e9-110f77de3b4e;1.0
    confs: [default]
    found org.scalaj#scalaj-http_2.10;2.3.0 in central
:: resolution report :: resolve 140ms :: artifacts dl 5ms
    :: modules in use:
    org.scalaj#scalaj-http_2.10;2.3.0 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-200abadc-d4a6-47dd-a2e9-110f77de3b4e
    confs: [default]
    0 artifacts copied, 1 already retrieved (0kB/6ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/04/11 19:00:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/04/11 19:00:36 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
19/04/11 19:00:40 WARN Client: Same path resource file:///home/ec2-user/.ivy2/jars/org.scalaj_scalaj-http_2.10-2.3.0.jar added multiple times to distributed cache.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.6.7 (default, Dec 21 2018 20:31:01)
SparkSession available as 'spark'.

Затем я запускаю свой тестовый код:

sodauri = 'https://soda.demo.socrata.com/resource/6yvf-kk3n.json'
sodainput1 = [("Nevada"), ("nn")]
sodainput2 = [("Northern California"), ("pr")]
sodainput3 = [("Virgin Islands region"), ("pr")]
sodainputRdd = spark.sparkContext.parallelize([sodainput1, sodainput2, sodainput3])
sodaDf = sodainputRdd.toDF(["region","source"])
sodaDf.createOrReplaceTempView('sodainputtbl')                         
prmsSoda = { 'url' : sodauri, 'input' : 'sodainputtbl', 'method' : 'GET', 'readTimeout' : '10000', 'connectionTimeout' : '2000', 'partitions' : '10'}
sodasDf = spark.read.format("org.apache.dsext.spark.datasource.rest.RestDataSource").options(**prmsSoda).load()
[Stage 6:>                                                          (0 + 1) / 1]19/04/11 20:34:39 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 8, ip-10-100-14-225.us-west-2.compute.internal, executor 3): java.lang.NoClassDefFoundError: scalaj/http/Http$
    at org.apache.dsext.spark.datasource.rest.RestConnectorUtil$.callRestAPI(RestConnectorUtil.scala:53)
    at org.apache.dsext.spark.datasource.rest.RESTRelation.org$apache$dsext$spark$datasource$rest$RESTRelation$$callRest(RestRelation.scala:128)
    at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
    at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
    at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
    at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
    at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: scalaj.http.Http$
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 31 more

19/04/11 20:34:39 ERROR TaskSetManager: Task 0 in stage 6.0 failed 4 times; aborting job
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 172, in load
    return self._df(self._jreader.load())
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o193.load.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 11, ip-10-100-14-225.us-west-2.compute.internal, executor 3): java.lang.NoClassDefFoundError: scalaj/http/Http$
    at org.apache.dsext.spark.datasource.rest.RestConnectorUtil$.callRestAPI(RestConnectorUtil.scala:53)
    at org.apache.dsext.spark.datasource.rest.RESTRelation.org$apache$dsext$spark$datasource$rest$RESTRelation$$callRest(RestRelation.scala:128)
    at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
    at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
    at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
    at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
    at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$.infer(JsonInferSchema.scala:83)
    at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
    at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.inferFromDataset(JsonDataSource.scala:108)
    at org.apache.spark.sql.DataFrameReader$$anonfun$2.apply(DataFrameReader.scala:439)
    at org.apache.spark.sql.DataFrameReader$$anonfun$2.apply(DataFrameReader.scala:439)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:438)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:419)
    at org.apache.dsext.spark.datasource.rest.RESTRelation.<init>(RestRelation.scala:101)
    at org.apache.dsext.spark.datasource.rest.RestDataSource.createRelation(RestDataSource.scala:42)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: scalaj/http/Http$
    at org.apache.dsext.spark.datasource.rest.RestConnectorUtil$.callRestAPI(RestConnectorUtil.scala:53)
    at org.apache.dsext.spark.datasource.rest.RESTRelation.org$apache$dsext$spark$datasource$rest$RESTRelation$$callRest(RestRelation.scala:128)
    at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
    at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
    at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
    at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
    at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
>>> 

Что не так с моей конфигурацией?

Обновление

Я попытался настроить кластер EMR с scalaj-http_2.11, но я все еще получаю ту же ошибку.Я также перестроил плагин, убедившись, что он создан для версии 2.11.12 Scala и Spark 2.4, такой же, как те, которые работают на EMR.Я все еще получаю ту же ошибку.

Теперь я просматриваю предупреждающие сообщения на случай, если это проблема:

Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.

Same path resource file:///home/ec2-user/.ivy2/jars/org.scalaj_scalaj-http_2.10-2.3.0.jar added multiple times to distributed cache.

1 Ответ

1 голос
/ 12 апреля 2019

У меня все работает

Кажется, это связано с тем, что конфигурация пакетов не работает.Вместо этого я скопировал требуемый scalaj-http_2.11:2.3.0 jar из репозитория maven downloads в тот же каталог, в который также был скопирован пользовательский плагин источника данных.Теперь он работает (линия spark.jars.packages удалена из конфигурации EMR)

...