Искра на EMR не находит мои модули Python, так как EMR 5.11 - PullRequest
0 голосов
/ 07 июня 2018

Я запускаю pyspark в AWS EMR начиная с EMR 5.3 и никогда не сталкивался с этой проблемой до тех пор, пока я не обновил ее до EMR 5.11 или новее, это полная трассировка стека:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 34, ip-10-1-156-139.ec2.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1528361666913_0002/container_1528361666913_0002_01_000002/pyspark.zip/pyspark/worker.py", line 216, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1528361666913_0002/container_1528361666913_0002_01_000002/pyspark.zip/pyspark/worker.py", line 58, in read_command
    command = serializer._read_with_length(file)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1528361666913_0002/container_1528361666913_0002_01_000002/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1528361666913_0002/container_1528361666913_0002_01_000002/pyspark.zip/pyspark/serializers.py", line 562, in loads
    return pickle.loads(obj)
ImportError: No module named custom.custom

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:295)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:266)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1750)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1738)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1737)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1737)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:871)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1971)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1920)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1909)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:682)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)

Тот же код и выполнение выполняется безвопросы по EMR 5.10.Я запускаю приложение pyspark, используя следующие шаги EMR:

[
  {
    "Name": "Init",
     "HadoopJarStep": {
       "Jar": "command-runner.jar",
       "Args": ["/bin/sh", "-c",
         "sudo -E mkdir /mnt/tmp/app/code&&sudo aws s3 cp s3://bucket/entry.py /mnt/tmp/app/code/entry.py&&sudo aws s3 cp s3://bucket/code-1.0-py2.7.egg /mnt/tmp/app/code/code-1.0-py2.7.egg"]
     },
    "ActionOnFailure": "CANCEL_AND_WAIT"
  },
  {
    "Name": "Run SPARK Program",
    "HadoopJarStep": {
      "Jar": "command-runner.jar",
      "Args": ["/bin/sh", "-c",
               "spark-submit --py-files local:///mnt/tmp/app/code/code-1.0-py2.7.egg local:///mnt/tmp/app/code/entry.py"]
    },
    "ActionOnFailure": "CANCEL_AND_WAIT"
  }
]

Обновление: Мне удалось избежать этой проблемы, поместив код приложения в исполнителя и указав путь Python к нему, но я не думаю, что это правильный путь, потому что мне никогда не приходилось делать это в предыдущих версиях EMR.

...