Я создаю Spark Streaming Application и хочу обрабатывать каждое сообщение Кафки отдельно. Сейчас я хочу просто записать значения сообщений, чтобы проверить, что все работает. Но когда я пытаюсь напечатать значения сообщения, я получаю java.lang.NullPointerException
. У меня есть условие пропустить пустые СДР. Проблема со строкой logger.warn(s"Message ${message.value.toString}")
, потому что без нее все работает нормально. Что может быть причиной исключения?
Моя идея состоит в том, чтобы разделить входные данные DStream
на отдельные RDD, поэтому я использую метод foreachRDD
. Используя его, я должен получить RDD ConsumerRecord
, где первый элемент является ключевым, а второй - фактическим значением. Затем я начинаю обработку каждого ConsumerRecord
, пытаясь извлечь значение сообщения и распечатать его.
val kafkaStream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
kafkaStream.foreachRDD( (rdd: RDD[ConsumerRecord[String, String]], time: Time) =>
if(!rdd.isEmpty){
rdd.foreach(message =>
if(!message.value().isEmpty){
logger.warn(s"Message ${message.value.toString}")
}else{
logger.warn(s"Empty value")
}
)
}else{
logger.warn(s"Empty rdd")
}
)