У меня есть задача прочитать каждое сообщение в теме Кафки, которая имеет 3 раздела.У меня есть Spark и Scala в качестве инструментов для этого.
Пока у меня есть следующий код:
val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe
[String, String](cdrTopic, kafkaParams))
kafkaStream.foreachRDD { rdd => {
if (!rdd.isEmpty()) {
rdd.foreachPartition(iterator => {
while (iterator.hasNext) {
val partition = iterator.next().value().split("\\n")
if (!partition.isEmpty) {
partition.foreach(string => {
if (!string.isEmpty) {
//process every message here
}
})
}
println("no partition data")
}
})
}
println("no rdd data to process")
}
}
Каждое сообщение представляет собой строку tsv (значения, разделенные табуляцией), которую мне нужно проанализировать и сохранить в db.
Я считаю, что это очень неэффективно, потому что я использую 4 цикла, и я думаю, что некоторые данные теряются, так как количество сообщений, записываемых в БД, очень мало.
Я использую scala версии 2.11.12, spark-streaming 2.4.0, spark-streaming-kafka 2.4.0.
Существует ли эффективный способ чтения и анализа каждого сообщения в kafka с использованием spark?