Я учу Кафку в Скале. Прикрепленный код является просто реализацией подсчета слов с использованием Kafka и Spark Streaming. Как у меня есть отдельное потребительское выполнение за раздел во время потоковой передачи? Пожалуйста, помогите!
Вот мой код:
class ConsumerM(topics: String, bootstrap_server: String, group_name: String) {
Logger.getLogger("org").setLevel(Level.ERROR)
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
.setMaster("local[*]")
.set("spark.executor.memory","1g")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val topicsSet = topics.split(",")
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrap_server,
ConsumerConfig.GROUP_ID_CONFIG -> group_name,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
"auto.offset.reset" ->"earliest")
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}