MicroBatchExecution Spark Структурированная потоковая передача с Kafka 2.4.0 - PullRequest
0 голосов
/ 21 января 2019

Когда я пытаюсь запустить потоковое приложение Spark с интеграцией Kafka, я получаю эту ошибку:

ERROR MicroBatchExecution: Query [id = ff14fce6-71d3-4616-bd2d-40f07a85a74b, runId = 42670f29-21a9-4f7e-abd0-66ead8807282] terminated with error
java.lang.IllegalStateException: No entry found for connection 2147483647

Почему это происходит?Может ли это быть проблема некоторых зависимостей?

Мой файл build.sbt выглядит следующим образом:

name := "SparkAirflowK8s"

version := "0.1"

scalaVersion := "2.12.7"

val sparkVersion = "2.4.0"
val circeVersion = "0.11.0"

dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.9.8"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.8"
dependencyOverrides += "com.fasterxml.jackson.module" % "jackson-module-scala_2.12" % "2.9.8"

resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"
resolvers += "confluent" at "http://packages.confluent.io/maven/"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
  "org.apache.kafka" %% "kafka" % "2.1.0",
  "org.scalatest" %% "scalatest" % "3.2.0-SNAP10" % "it, test",
  "org.scalacheck" %% "scalacheck" % "1.14.0" % "it, test",
  "io.kubernetes" % "client-java" % "3.0.0" % "it",
  "org.json" % "json" % "20180813",
  "io.circe" %% "circe-core" % circeVersion,
  "io.circe" %% "circe-generic" % circeVersion,
  "io.circe" %% "circe-parser" % circeVersion,
  "org.apache.avro" % "avro" % "1.8.2",
  "io.confluent" % "kafka-avro-serializer" % "5.0.1"
)

Вот часть кода:

val sparkConf = new SparkConf()
      .setMaster(args(0))
      .setAppName("KafkaSparkJob")

    val sparkSession = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()

val avroStream = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "topic1")
      .load()

val outputExample = avroStream
      .writeStream
      .outputMode("append")
      .format("console")
      .start()

    outputExample.awaitTermination()

1 Ответ

0 голосов
/ 22 января 2019

Я изменил localhost на службу NodePort, определенную для развертывания Kafka.Теперь это исключение не появляется.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...