Я не могу отправить работу Spark через spark-submit в EMR. Моя искра-отправка выглядит так -
sudo spark-submit --class timeusage.TimeUsage \
--deploy-mode cluster --master yarn \
--num-executors 2 --conf spark.executor.cores=2 \
--conf spark.executor.memory=2g --conf spark.driver.memory=1g \
--conf spark.driver.cores=1 --conf spark.logConf=true \
--conf spark.yarn.appMasterEnv.SPARKMASTER=yarn \
--conf spark.yarn.appMasterEnv.WAREHOUSEDIR=s3a://whbucket/spark-warehouse \
--conf spark.yarn.appMasterEnv.S3AACCESSKEY=xxx \
--conf spark.yarn.appMasterEnv.S3ASECRETKEY=yyy \
--jars s3://bucket/week3-assembly-0.1.0-SNAPSHOT.jar \
s3:/bucket/week3-assembly-0.1.0-SNAPSHOT.jar \
s3a://sbucket/atussum.csv
Ошибка выглядит так -
19/06/04 07:36:59 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, ip-172-31-66-110.ec2.internal, executor 1): java.lang.ExceptionInInitializerError
at timeusage.TimeUsage$$anonfun$8.apply(TimeUsage.scala:70)
at timeusage.TimeUsage$$anonfun$8.apply(TimeUsage.scala:70)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Library directory '/mnt/yarn/usercache/root/appcache/application_1559614942233_0036/container_1559614942233_0036_02_000002/assembly/target/scala-2.11/jars' does not exist; make sure Spark is built.
at org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:248)
at org.apache.spark.launcher.CommandBuilderUtils.findJarsDir(CommandBuilderUtils.java:342)
at org.apache.spark.launcher.YarnCommandBuilderUtils$.findJarsDir(YarnCommandBuilderUtils.scala:38)
at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:543)
at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:863)
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:177)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:178)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:501)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:936)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:927)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:927)
at timeusage.TimeUsage$.<init>(TimeUsage.scala:23)
at timeusage.TimeUsage$.<clinit>(TimeUsage.scala)
... 23 more
Я проверил, что все зависимости сборки моего проекта верны. И проект работает на местном [*].
Это первый раз, когда я работаю с многомодульным проектом SBT - я не уверен, что это как-то связано с этим?
Я добавил сборочный JAR для выполнения в конфигурацию --jars, но это никак не повлияло.
Мой build.sbt здесь - https://github.com/kevvo83/scala-spark-ln/blob/master/build.sbt
Ожидаемый результат: проект завершается и создает таблицы Hive на этапе S3.
Я все еще занимаюсь расследованием и буду публиковать здесь обновления, как только они появятся
После ответа Харша я добавил эти 2 строки в свою команду spark-submit -
--files /usr/lib/spark/conf/hive-site.xml \
--jars s3://bucket/week3-assembly-0.1.0-SNAPSHOT.jar \
Теперь ошибка трассировки стека -
19/06/06 10:37:55 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, ip-172-31-76-146.ec2.internal, executor 1): java.lang.NoClassDefFoundError: Could not initialize class *timeusage.TimeUsage*$
at timeusage.TimeUsage$$anonfun$8.apply(TimeUsage.scala:70)
at timeusage.TimeUsage$$anonfun$8.apply(TimeUsage.scala:70)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
(FYI timeusage.TimeUsage - мой класс в JAR). Что-нибудь еще, что я должен включить, чтобы гарантировать, что мои классовые определения получают повышение?
ОБНОВЛЕНИЕ: Я получил это на работу - я думаю, что последние 3 конф в приведенном ниже фрагменте кода - это то, что сработало (основываясь на том, как документы говорят, что Spark загружает Jars в промежуточную область на HDFS для доступа Исполнителей).
--conf spark.executorEnv.SPARK_HOME=/usr/lib/spark/
--conf spark.yarn.jars=/usr/lib/spark/jars/*.jar
--conf spark.network.timeout=600000
--files /usr/lib/spark/conf/spark-defaults.conf
Далее, spark-submit выполняет Jar с локального диска, а не из корзины S3, как я делал это ранее неправильно.
Пометка ответа как правильного, поскольку он поставил меня на правильный путь к его решению.