Как переделить Spark DStream Kafka ConsumerRecord RDD - PullRequest
0 голосов
/ 28 сентября 2018

Я получаю неравномерный размер тем Кафки.Мы хотим перераспределить входной RDD на основе некоторой логики.

Но когда я пытаюсь применить перераспределение, я получаю object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord ошибку.

Я нашел следующий обходной путь

Задание прервано из-за сбоя этапа: задание не сериализуемо

Вызовите rdd.forEachPartition и создайте там NotSerializable объект, подобный этому:

rdd.forEachPartition(iter -> {
  NotSerializable notSerializable = new NotSerializable();

  // ...Now process iter
});

ВЫШЕ ПРИМЕНИМОЙ ЛОГИКЕЗДЕСЬ не уверен, что я что-то пропустил

val stream =KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam) ).map(_.value())
      stream.foreachRDD { rdd =>
        val repartitionRDD = flow.repartitionRDD(rdd,1)
        println("&&&&&&&&&&&&&& repartitionRDD " + repartitionRDD.count())
       val modifiedRDD = rdd.mapPartitions { 
          iter =>{
            val customerRecords: List[ConsumerRecord[String, String]] = List[ConsumerRecord[String, String]]()
             while(iter.hasNext){
                  val consumerRecord :ConsumerRecord[String, String] = iter.next()
                  customerRecords:+ consumerRecord
             }
             customerRecords.iterator
          }
        }
        val r = modifiedRDD.repartition(1)
        println("************* after repartition " + r.count())

НО все равно получаю тот же объект, не сериализуемую ошибку.Любая помощь очень ценится.

  1. Я пытался сделать поток переходным, но это также не помогло решить проблему.

  2. Я сделал тестовый класс как Serializable, но не исправилвопрос.

...