Почему потребительский код kafka зависает при запуске spark stream? - PullRequest
0 голосов
/ 17 октября 2019

Я новичок в Kafka и пытаюсь реализовать потребительскую логику Kafka в spark2, и когда я запускаю весь свой код в оболочке и запускаю потоковую передачу, он ничего не показывает.

Я просмотрел много сообщений в StackOverflow, но ничегопомог мне. Я даже загрузил все jar-файлы зависимостей из maven и попытался запустить, но он по-прежнему ничего не показывает.

Spark Версия: 2.2.0 Scala версия 2.11.8 jar, которые я скачал, являются kafka-clients-2.2.0.jarи spark-streaming-kafka-0-10_2.11-2.2.0.jar

но я все еще сталкиваюсь с той же проблемой.

Пожалуйста, найдите фрагмент кода ниже

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.kafka010.{KafkaUtils, ConsumerStrategies, LocationStrategies}

val brokers = "host1:port, host2:port"
val groupid = "default"
val topics = "kafka_sample"
val topicset = topics.split(",").toSet

val ssc = new StreamingContext(sc, Seconds(2))

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  ConsumerConfig.GROUP_ID_CONFIG -> groupid,
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)

val msg = KafkaUtils.createDirectStream[String, String](
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicset, kafkaParams)
)

msg.foreachRDD{
rdd => rdd.collect().foreach(println)
}
ssc.start()

Я ожидаю запуска SparkStreaming, но он ничего не делает. Какую ошибку я сделал здесь? Или это известная проблема?

Ответы [ 2 ]

1 голос
/ 17 октября 2019

Водитель будет бездействовать, если вы не позвоните ssc.awaitTermination() в конце. Если вы используете spark-shell, то это не очень хороший инструмент для потоковых заданий. Пожалуйста, используйте интерактивные инструменты, такие как Zeppelin или Spark notebook, для взаимодействия с потоковой передачей или попробуйте создать приложение в виде jar-файла, а затем развернуть.

Кроме того, если вы пытаетесь использовать потоковую потоковую обработку, структурированная потоковая передача будет лучшедовольно легко играть с.

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

0 голосов
/ 11 ноября 2019
  1. После ssc.start() используйте ssc.awaitTermination() в своем коде.
  2. Для тестирования напишите свой код в Main Object и запустите его в любой IDE, такой как Intellij
  3. Вы можете использовать командную оболочку и публиковать сообщения от производителя Kafka.

Я написал все эти шаги в простом примере в сообщении в блоге с рабочим кодом в GitHub. Пожалуйста, обратитесь к: http://softwaredevelopercentral.blogspot.com/2018/10/spark-streaming-and-kafka-integration.html

...