Развертывание приложения spark, записанного в scala, в кластер EMR с помощью следующей команды, и я не могу понять, почему я получаю сообщение об ошибке отсутствующей зависимости при развертывании в экземпляре кластера EMR.
сообщение об ошибке :
User class threw exception: java.lang.NoClassDefFoundError: com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream
aws emr add-steps --cluster-id j-xxxxxxx --steps Type=spark,Name=ScalaStream,Args=[\
--class,"ScalaStream",\
--deploy-mode,cluster,\
--master,yarn,\
--jars,s3://xxx.xxx.xxx/aws-java-sdk-1.11.715.jar,\
--conf,spark.yarn.submit.waitAppCompletion=false,\
s3://xxx.xxxx.xxxx/simple-project_2.12-1.0.jar\
],ActionOnFailure=CONTINUE
и файл sbt
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.8"
libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % "2.4.4"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "2.4.4"
libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.11.715"
libraryDependencies += "org.apache.spark" % "spark-streaming-kinesis-asl_2.12" % "2.4.4"
частичный код ниже
...
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
...
val streamingContext = new StreamingContext(sparkContext, batchInterval)
// Populate the appropriate variables from the given args
val streamAppName = "xxxxxx"
val streamName = "xxxxxx"
val endpointUrl = "https://kinesis.xxxxx.amazonaws.com"
val regionName = "xx-xx-x"
val initialPosition = InitialPositionInStream.LATEST
val checkpointInterval = batchInterval
val storageLevel = StorageLevel.MEMORY_AND_DISK_2
val kinesisStream = KinesisUtils.createStream(streamingContext, streamAppName, streamAppName, endpointUrl, regionName, initialPosition, checkpointInterval, storageLevel)
val initialPosition = InitialPositionInStream.LATEST
val checkpointInterval = batchInterval
val storageLevel = StorageLevel.MEMORY_AND_DISK_2
val kinesisStream = KinesisUtils.createStream(streamingContext, streamAppName, streamAppName, endpointUrl, regionName, initialPosition, checkpointInterval, storageLevel)
20/02/05 21:43:10 ERROR ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream
java.lang.NoClassDefFoundError: com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream
at ScalaStream$.main(stream.scala:32)
at ScalaStream.main(stream.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more
20/02/05 21:43:10 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.NoClassDefFoundError: com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream
at ScalaStream$.main(stream.scala:32)
at ScalaStream.main(stream.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more
)
Я пробовал включить зависимости aws как в файле sbt, так и в параметре --jars в spark-submit, но не может понять, почему отсутствует зависимость?