Как обработать ввод DStream, разделив его на RDD для печати сообщений Kafka? - PullRequest
0 голосов
/ 31 марта 2019

Я создаю Spark Streaming Application и хочу обрабатывать каждое сообщение Кафки отдельно. Сейчас я хочу просто записать значения сообщений, чтобы проверить, что все работает. Но когда я пытаюсь напечатать значения сообщения, я получаю java.lang.NullPointerException. У меня есть условие пропустить пустые СДР. Проблема со строкой logger.warn(s"Message ${message.value.toString}"), потому что без нее все работает нормально. Что может быть причиной исключения?

Моя идея состоит в том, чтобы разделить входные данные DStream на отдельные RDD, поэтому я использую метод foreachRDD. Используя его, я должен получить RDD ConsumerRecord, где первый элемент является ключевым, а второй - фактическим значением. Затем я начинаю обработку каждого ConsumerRecord, пытаясь извлечь значение сообщения и распечатать его.

val kafkaStream = KafkaUtils.createDirectStream[String, String](
    streamingContext,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )

kafkaStream.foreachRDD( (rdd: RDD[ConsumerRecord[String, String]], time: Time) =>
    if(!rdd.isEmpty){
      rdd.foreach(message =>
        if(!message.value().isEmpty){
          logger.warn(s"Message ${message.value.toString}")
        }else{
          logger.warn(s"Empty value")
        }
      )
    }else{
      logger.warn(s"Empty rdd")
    }
  )
...