У меня есть очень простое и простое приложение, в котором у меня есть тема1 и тема2, потоковое приложение, которое читает из темы1, и другое приложение, которое берет результат и отправляет его в тему 2.
Чтение:
import org.apache.spark.sql.{DataFrame, SparkSession}
object ReadKafkaTopic {
def readStream(spark: SparkSession, brokers: String, topic: String): DataFrame = {
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("failOnDataLoss", false)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.load()
}
}
Запись:
import org.apache.spark.sql.{DataFrame}
object WriteKafkaTopic {
def writeStream(df: DataFrame, brokers: String, topic: String): Unit = {
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("topic", topic)
.option("checkpointLocation", "/tmp/checkpoint")
.start()
}
}
Это прекрасно работает.Теперь я пытаюсь «проверить» его с помощью локальной проверки кафки для моих тестов (используя net.manub.embeddedkafka.EmbeddedKafka)
И когда я запускаю свои тесты, я получаю эту ошибку:
Cause: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.8
Таким образом, после прочтения в сети схожих проблем я передал все свои зависимости Джексона в 2.6.7 (конечно, попробовав разные варианты):
scalaVersion := "2.12.8"
val sparkVersion = "2.4.2"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.2.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion % "provided"
// testing dependencies
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
libraryDependencies += "io.github.embeddedkafka" %% "embedded-kafka" % "2.2.0" % "test"
// https://mvnrepository.com/artifact/org.apache.curator/curator-test
libraryDependencies += "org.apache.curator" % "curator-test" % "4.2.0" % Test
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-annotations" % "2.6.7"
//
//// https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8
dependencyOverrides += "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.6.7"
Но я продолжаю работать с той же ошибкой Несовместимая версия Jackson: 2.9.8
Кто-нибудь сталкивался с этой проблемой и знает, как ее исправить?