Не использовать сообщение от spark-streaming-kafka-0-10 с опцией kafka.bootstrap.servers - PullRequest
0 голосов
/ 30 октября 2018

Я использую kafka 1.0.1-kafka-3.1.0-SNAPSHOT из CDH (дистрибутив cloudera для hadoop)

На моем пограничном сервере batch-1 Я могу выдавать сообщений с:

kafka-console-producer --broker-list batch-1:9092 --topic MyTopic

Я могу потреблять сообщений благодаря Zookeeper на моем первом узле с:

kafka-console-consumer --zookeeper data1:2181 --topic MyTopic --from-beginning

Но я получаю ничего с опцией bootstrap-server :

kafka-console-consumer --bootstrap-server batch-1:9092 --topic MyTopic --from-beginning

Проблема в том, что я использую Кафку на свече:

libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0"

val df = spark.readStream
  .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
  .option("kafka.bootstrap.servers", "batch-1:9092")
  .option("subscribe", "MyTopic")
  .load()

println("Select :")

val df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)")
  .as[(String, String, String)]

println("Show :")

val query = df2.writeStream
  .outputMode("append")
  .format("console")
  .start()

query.awaitTermination()

Я сделал export SPARK_KAFKA_VERSION=0.10 на своем краю. Тогда

spark2-submit --driver-memory 2G --jars spark-sql-kafka-0-10_2.11-2.3.0.cloudera4.jar --class "spark.streaming.Poc" poc_spark_kafka_2.11-0.0.1.jar

Это вынуждает меня использовать kafka.bootstrap.servers, похоже, что оно подключено, но я не могу получить никакого сообщения.

Выход такой же, как у kafka-console-consumer с опцией --bootstrap-server:

18/10/30 16:11:48 INFO utils.AppInfoParser: Kafka version : 0.10.0-kafka-2.1.0
18/10/30 16:11:48 INFO utils.AppInfoParser: Kafka commitId : unknown
18/10/30 16:11:48 INFO streaming.MicroBatchExecution: Starting new streaming query.

Тогда ничего. Должен ли я подключиться к Zookeeper? Как ?

Есть ли конфликт версий, в котором говорится: «Руководство по интеграции Structured Streaming + Kafka (версия брокера Kafka 0.10.0 или выше )» здесь: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html?

Что я пропустил?

1 Ответ

0 голосов
/ 31 октября 2018

РЕШЕНИЕ

/var/log/kafka/kafka-broker-batch-1.log сказал:

2018-10-31 13:40:08,284 ERROR kafka.server.KafkaApis: [KafkaApi-51] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet.

Итак, я развернул 3 брокера на узлах кластера со шлюзом на границе, теперь он работает с:

kafka-console-producer --broker-list data1:9092,data2:9092,data3:9092 --topic Test

kafka-console-consumer --bootstrap-server data1:9092 --topic Test --from-beginning

Spark тоже отлично работает.

...