У меня есть pairRdd
, постоянно получающий данные, я хочу выводить его содержимое в тему kafka каждые x минут, затем удаляю его содержимое.
Я пробовал пару вещей, но каждый раз это выдавало эту ошибку.
Исключение в потоке "Таймер-1" org.apache.spark.SparkException: задача не сериализуема
в org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 298)
в org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean (ClosureCleaner.scala: 288)
в org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala: 108)
в org.apache.spark.SparkContext.clean (SparkContext.scala: 2287)
в org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply (RDD.scala: 925)
в org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply (RDD.scala: 924)
в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151)
в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 112)
в org.apache.spark.rdd.RDD.withScope (RDD.scala: 362)
в org.apache.spark.rdd.RDD.foreachPartition (RDD.scala: 924)
в org.apache.spark.api.java.JavaRDDLike $ class.foreachPartition (JavaRDDLike.scala: 219)
в org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition (JavaRDDLike.scala: 45)
на SparkProcess $ 1.run (SparkProcess.java:94)
в java.base / java.util.TimerThread.mainLoop (Timer.java:556)
в java.base / java.util.TimerThread.run (Timer.java:506)
Вызывается: java.io.NotSerializableException: SparkProcess $ 1
Стек сериализации:
- объект не сериализуем (класс: SparkProcess $ 1, значение: SparkProcess $ 1 @ 28612f9b)
- элемент массива (индекс: 0)
- массив (класс [Ljava.lang.Object ;, размер 1)
- поле (класс: java.lang.invoke.SerializedLambda, имя: capturedArgs, тип: class [Ljava.lang.Object;)
- объект (класс java.lang.invoke.SerializedLambda, SerializedLambda [capturingClass = класс SparkProcess $ 1, functionsInterfaceMethod = org / apache / spark / api / java / function / VoidFunction.call: (Ljava / lang / Object;) V, реализация = invokeSpecial SparkProcess $ 1.lambda $ run $ e3b46054 $ 1: (Ljava / util / Iterator;) V, instantiatedMethodType = (Ljava / util / Iterator;) V, numCaptured = 1])
- writeReplace data (класс: java.lang.invoke.SerializedLambda)
- объект (класс SparkProcess $ 1 $$ Lambda $ 105/77425562, SparkProcess $ 1 $$ Lambda $ 105/77425562 @ 346d59fc)
- поле (класс: org.apache.spark.api.java.JavaRDDLike $$ anonfun $ foreachPartition $ 1, имя: f $ 12, тип: interface org.apache.spark.api.java.function.VoidFunction)
- объект (класс org.apache.spark.api.java.JavaRDDLike $$ anonfun $ foreachPartition $ 1,)
в org.apache.spark.serializer.SerializationDebugger $ .improveException (SerializationDebugger.scala: 40)
в org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 46)
в org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 100)
в org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 295)
... еще 14
Моя текущая функция выглядит так:
public void cacheToTopic(){
Timer t = new Timer();
t.scheduleAtFixedRate(
new TimerTask()
{
public void run()
{
pairRdd.foreachPartition(record->{
Producer<String, String> producer=createKafkaProducer();
ProducerRecord<String, String> data = new ProducerRecord<String, String>("output"
, DataObjectFactory.getRawJSON(record));
producer.send(data);
});
}
},
3000, // run first occurrence after three seconds
3000); // run every three seconds
}