Я использовал spark-sql-kafka-0-10
для чтения партии из Кафки с Spark 2.4
и scala 2.11.12
.Таким образом, мой build.sbt
файл имеет следующие зависимости.
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion
Я также использовал плагин sbt-assembly
для создания толстой фляги моего приложения.Он хорошо работает при развертывании этого jar-файла на локальном спарке, как показано ниже, где $ FAT_JAR указывает на мой файл сборки:
./spark-submit --class $MAIN_CLASS --master local --driver-class-path $FAT_JAR $FAT_JAR
Но когда я развертываю его в кластере (даже если и рабочий, и мастер находятся на одном компьютере)это исключение о проблеме десериализации TopicPartiton
.
Как я работаю в кластере:
./spark-submit \
--master spark://spark-master:7077 \
--class $MAIN_CLASS \
--driver-class-path $FAT_JAR \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 \
$FAT_JAR
Я также пробовал --jars
, и я уверен, что рабочий и мастер имеют версию сохранения kafka-client
, которая 2.0.0
Журнал исключений:
Caused by: java.io.InvalidClassException: org.apache.kafka.common.TopicPartition; class invalid for deserialization
at java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:169)
at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:874)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2043)
Почему искра не десериализуется TopicPartition
и как я могу ее решить?