Я использую kafka 1.0.1-kafka-3.1.0-SNAPSHOT
из CDH (дистрибутив cloudera для hadoop)
На моем пограничном сервере batch-1 Я могу выдавать сообщений с:
kafka-console-producer --broker-list batch-1:9092 --topic MyTopic
Я могу потреблять сообщений благодаря Zookeeper на моем первом узле с:
kafka-console-consumer --zookeeper data1:2181 --topic MyTopic --from-beginning
Но я получаю ничего с опцией bootstrap-server :
kafka-console-consumer --bootstrap-server batch-1:9092 --topic MyTopic --from-beginning
Проблема в том, что я использую Кафку на свече:
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0"
val df = spark.readStream
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.option("kafka.bootstrap.servers", "batch-1:9092")
.option("subscribe", "MyTopic")
.load()
println("Select :")
val df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)")
.as[(String, String, String)]
println("Show :")
val query = df2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
Я сделал export SPARK_KAFKA_VERSION=0.10
на своем краю. Тогда
spark2-submit --driver-memory 2G --jars spark-sql-kafka-0-10_2.11-2.3.0.cloudera4.jar --class "spark.streaming.Poc" poc_spark_kafka_2.11-0.0.1.jar
Это вынуждает меня использовать kafka.bootstrap.servers
, похоже, что оно подключено, но я не могу получить никакого сообщения.
Выход такой же, как у kafka-console-consumer
с опцией --bootstrap-server
:
18/10/30 16:11:48 INFO utils.AppInfoParser: Kafka version : 0.10.0-kafka-2.1.0
18/10/30 16:11:48 INFO utils.AppInfoParser: Kafka commitId : unknown
18/10/30 16:11:48 INFO streaming.MicroBatchExecution: Starting new streaming query.
Тогда ничего.
Должен ли я подключиться к Zookeeper? Как ?
Есть ли конфликт версий, в котором говорится: «Руководство по интеграции Structured Streaming + Kafka (версия брокера Kafka 0.10.0 или выше )» здесь: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html?
Что я пропустил?