Я новичок в Kafka и пытаюсь реализовать потребительскую логику Kafka в spark2, и когда я запускаю весь свой код в оболочке и запускаю потоковую передачу, он ничего не показывает.
Я просмотрел много сообщений в StackOverflow, но ничегопомог мне. Я даже загрузил все jar-файлы зависимостей из maven и попытался запустить, но он по-прежнему ничего не показывает.
Spark Версия: 2.2.0 Scala версия 2.11.8 jar, которые я скачал, являются kafka-clients-2.2.0.jarи spark-streaming-kafka-0-10_2.11-2.2.0.jar
но я все еще сталкиваюсь с той же проблемой.
Пожалуйста, найдите фрагмент кода ниже
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.kafka010.{KafkaUtils, ConsumerStrategies, LocationStrategies}
val brokers = "host1:port, host2:port"
val groupid = "default"
val topics = "kafka_sample"
val topicset = topics.split(",").toSet
val ssc = new StreamingContext(sc, Seconds(2))
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupid,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
val msg = KafkaUtils.createDirectStream[String, String](
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicset, kafkaParams)
)
msg.foreachRDD{
rdd => rdd.collect().foreach(println)
}
ssc.start()
Я ожидаю запуска SparkStreaming, но он ничего не делает. Какую ошибку я сделал здесь? Или это известная проблема?