scala spark NoClassDefFoundError - InitialPositionInStream - PullRequest
0 голосов
/ 06 февраля 2020

Развертывание приложения spark, записанного в scala, в кластер EMR с помощью следующей команды, и я не могу понять, почему я получаю сообщение об ошибке отсутствующей зависимости при развертывании в экземпляре кластера EMR.

сообщение об ошибке :

User class threw exception: java.lang.NoClassDefFoundError: com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream
aws emr add-steps --cluster-id j-xxxxxxx --steps Type=spark,Name=ScalaStream,Args=[\
--class,"ScalaStream",\
--deploy-mode,cluster,\
--master,yarn,\
--jars,s3://xxx.xxx.xxx/aws-java-sdk-1.11.715.jar,\
--conf,spark.yarn.submit.waitAppCompletion=false,\
s3://xxx.xxxx.xxxx/simple-project_2.12-1.0.jar\
],ActionOnFailure=CONTINUE

и файл sbt

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.8"

libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % "2.4.4"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "2.4.4"
libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.11.715"
libraryDependencies += "org.apache.spark" % "spark-streaming-kinesis-asl_2.12" % "2.4.4"

частичный код ниже

...

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

...

        val streamingContext = new StreamingContext(sparkContext, batchInterval)

        // Populate the appropriate variables from the given args
        val streamAppName = "xxxxxx"
        val streamName = "xxxxxx"
        val endpointUrl = "https://kinesis.xxxxx.amazonaws.com"
        val regionName = "xx-xx-x"
        val initialPosition = InitialPositionInStream.LATEST
        val checkpointInterval = batchInterval
        val storageLevel = StorageLevel.MEMORY_AND_DISK_2

        val kinesisStream = KinesisUtils.createStream(streamingContext, streamAppName, streamAppName, endpointUrl, regionName, initialPosition, checkpointInterval, storageLevel)

        val initialPosition = InitialPositionInStream.LATEST
        val checkpointInterval = batchInterval
        val storageLevel = StorageLevel.MEMORY_AND_DISK_2

        val kinesisStream = KinesisUtils.createStream(streamingContext, streamAppName, streamAppName, endpointUrl, regionName, initialPosition, checkpointInterval, storageLevel)

20/02/05 21:43:10 ERROR ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream
java.lang.NoClassDefFoundError: com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream
    at ScalaStream$.main(stream.scala:32)
    at ScalaStream.main(stream.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more
20/02/05 21:43:10 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.NoClassDefFoundError: com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream
    at ScalaStream$.main(stream.scala:32)
    at ScalaStream.main(stream.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more
)

Я пробовал включить зависимости aws как в файле sbt, так и в параметре --jars в spark-submit, но не может понять, почему отсутствует зависимость?

1 Ответ

1 голос
/ 06 февраля 2020

исправлено путем обновления следующего

sbt

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.8"

libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % "2.4.4"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "2.4.4"
libraryDependencies += "org.apache.spark" % "spark-streaming-kinesis-asl_2.12" % "2.4.4"

сценарий развертывания

aws emr add-steps --cluster-id j-xxxxxxx --steps Type=spark,Name=ScalaStream,Args=[\
--class,"ScalaStream",\
--deploy-mode,cluster,\
--master,yarn,\
--packages,\'org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.0,org.postgresql:postgresql:42.2.9,com.facebook.presto:presto-jdbc:0.60\',\
--conf,spark.yarn.submit.waitAppCompletion=false,\
--conf,yarn.log-aggregation-enable=true,\
--conf,spark.dynamicAllocation.enabled=true,\
--conf,spark.cores.max=4,\
--conf,spark.network.timeout=300,\
s3://xxx.xxx/simple-project_2.12-1.0.jar\
],ActionOnFailure=CONTINUE

ключом является флаг --packages, добавленный к aws emr add-steps. Ошибочно считал, sbt package связал необходимые зависимости.

...