Выход Spark Rdd в тему Кафки - PullRequest
0 голосов
/ 03 июля 2018

У меня есть pairRdd, постоянно получающий данные, я хочу выводить его содержимое в тему kafka каждые x минут, затем удаляю его содержимое.

Я пробовал пару вещей, но каждый раз это выдавало эту ошибку.

Исключение в потоке "Таймер-1" org.apache.spark.SparkException: задача не сериализуема в org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 298) в org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean (ClosureCleaner.scala: 288) в org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala: 108) в org.apache.spark.SparkContext.clean (SparkContext.scala: 2287) в org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply (RDD.scala: 925) в org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply (RDD.scala: 924) в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151) в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 112) в org.apache.spark.rdd.RDD.withScope (RDD.scala: 362) в org.apache.spark.rdd.RDD.foreachPartition (RDD.scala: 924) в org.apache.spark.api.java.JavaRDDLike $ class.foreachPartition (JavaRDDLike.scala: 219) в org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition (JavaRDDLike.scala: 45) на SparkProcess $ 1.run (SparkProcess.java:94) в java.base / java.util.TimerThread.mainLoop (Timer.java:556) в java.base / java.util.TimerThread.run (Timer.java:506) Вызывается: java.io.NotSerializableException: SparkProcess $ 1 Стек сериализации: - объект не сериализуем (класс: SparkProcess $ 1, значение: SparkProcess $ 1 @ 28612f9b) - элемент массива (индекс: 0) - массив (класс [Ljava.lang.Object ;, размер 1) - поле (класс: java.lang.invoke.SerializedLambda, имя: capturedArgs, тип: class [Ljava.lang.Object;) - объект (класс java.lang.invoke.SerializedLambda, SerializedLambda [capturingClass = класс SparkProcess $ 1, functionsInterfaceMethod = org / apache / spark / api / java / function / VoidFunction.call: (Ljava / lang / Object;) V, реализация = invokeSpecial SparkProcess $ 1.lambda $ run $ e3b46054 $ 1: (Ljava / util / Iterator;) V, instantiatedMethodType = (Ljava / util / Iterator;) V, numCaptured = 1]) - writeReplace data (класс: java.lang.invoke.SerializedLambda) - объект (класс SparkProcess $ 1 $$ Lambda $ 105/77425562, SparkProcess $ 1 $$ Lambda $ 105/77425562 @ 346d59fc) - поле (класс: org.apache.spark.api.java.JavaRDDLike $$ anonfun $ foreachPartition $ 1, имя: f $ 12, тип: interface org.apache.spark.api.java.function.VoidFunction) - объект (класс org.apache.spark.api.java.JavaRDDLike $$ anonfun $ foreachPartition $ 1,) в org.apache.spark.serializer.SerializationDebugger $ .improveException (SerializationDebugger.scala: 40) в org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 46) в org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 100) в org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 295) ... еще 14

Моя текущая функция выглядит так:

   public void cacheToTopic(){

    Timer t = new Timer();
    t.scheduleAtFixedRate(
            new TimerTask()
            {
                public void run()
                {                         

                    pairRdd.foreachPartition(record->{
                        Producer<String, String> producer=createKafkaProducer();
                        ProducerRecord<String, String> data = new ProducerRecord<String, String>("output"
                                , DataObjectFactory.getRawJSON(record));

                        producer.send(data);

                    });
                }
            },
            3000,      // run first occurrence after three seconds
            3000);  // run every three seconds
}
...