Я получаю неравномерный размер тем Кафки.Мы хотим перераспределить входной RDD на основе некоторой логики.
Но когда я пытаюсь применить перераспределение, я получаю object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord
ошибку.
Я нашел следующий обходной путь
Задание прервано из-за сбоя этапа: задание не сериализуемо
Вызовите rdd.forEachPartition
и создайте там NotSerializable
объект, подобный этому:
rdd.forEachPartition(iter -> {
NotSerializable notSerializable = new NotSerializable();
// ...Now process iter
});
ВЫШЕ ПРИМЕНИМОЙ ЛОГИКЕЗДЕСЬ не уверен, что я что-то пропустил
val stream =KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam) ).map(_.value())
stream.foreachRDD { rdd =>
val repartitionRDD = flow.repartitionRDD(rdd,1)
println("&&&&&&&&&&&&&& repartitionRDD " + repartitionRDD.count())
val modifiedRDD = rdd.mapPartitions {
iter =>{
val customerRecords: List[ConsumerRecord[String, String]] = List[ConsumerRecord[String, String]]()
while(iter.hasNext){
val consumerRecord :ConsumerRecord[String, String] = iter.next()
customerRecords:+ consumerRecord
}
customerRecords.iterator
}
}
val r = modifiedRDD.repartition(1)
println("************* after repartition " + r.count())
НО все равно получаю тот же объект, не сериализуемую ошибку.Любая помощь очень ценится.
Я пытался сделать поток переходным, но это также не помогло решить проблему.
Я сделал тестовый класс как Serializable, но не исправилвопрос.