Исключительная ситуация при сохранении паркета - SparkException: задача не сериализуема - NotSerializableException - объект не сериализуем - PullRequest
0 голосов
/ 05 января 2020

Класс DiagnosticEvent является классом, генерирующим Avro, и он также имеет serialVersionUID.

20/01/05 09:56:09 ERROR nodeStatsConfigDriven.NodeStatsKafkaProcessor: DiagnosticEvent Exception occurred during parquet saving
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2289)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2063)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
    at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1462)
    at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)
    at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1461)
    at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
    at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
    at Spark2ParquetEngine.nodeStatsConfigDriven.NodeStatsKafkaProcessor.processSubRecordList(NodeStatsKafkaProcessor.java:463)
    at Spark2ParquetEngine.nodeStatsConfigDriven.NodeStatsKafkaProcessor.access$100(NodeStatsKafkaProcessor.java:42)
    at Spark2ParquetEngine.nodeStatsConfigDriven.NodeStatsKafkaProcessor$4.call(NodeStatsKafkaProcessor.java:252)
    at Spark2ParquetEngine.nodeStatsConfigDriven.NodeStatsKafkaProcessor$4.call(NodeStatsKafkaProcessor.java:273)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: com.servicenow.bigdata.schema.nodeStats.DiagnosticEvent
Serialization stack:
    - object not serializable (class: com.servicenow.bigdata.schema.nodeStats.DiagnosticEvent, value: {"schema_version": 2, etc...})
    - writeObject data (class: java.util.ArrayList)
    - object (class java.util.ArrayList, [{"schema_version": 2, etc...}])
    - writeObject data (class: java.util.HashMap)
    - object (class java.util.HashMap, {d12e3671478a0602f82a17c94c88155a=[{"schema_version": 2, etc}]

Я искал эту проблему и пытался всеми возможными способами, как указано в сообщениях. Но не смог решить проблему в моем потоковом приложении. Пожалуйста, помогите исправить эту проблему в потоковом приложении, сохраняя данные.

Код ::

Исключением является точка для nodeStats.foreachRDD

for(ChildNodeConfig childNodeConfig : xmlStatsConfig.getChildNodesList().getChildNodes()){
    if(childNodeConfig.isUseStatefulCacheFilter() &&  statefulFiltersMap.containsKey(childNodeConfig.getNodeFilterConfig().getName())){
        //logger.info("Applying stateful filter : "+childNodeConfig.getNodeFilterConfig().getName());
        JavaPairDStream<Void, Node>  nodeStats  = statefulFiltersMap.get(childNodeConfig.getNodeFilterConfig().getName()).applyStatefulFilter(nodeStats,intialStatefulRDDs.get(childNodeConfig.getNodeFilterConfig().getName()),hiveContext);

        nodeStats.foreachRDD(new VoidFunction<JavaPairRDD<Void,Node>>() {
            @Override
            public void call(JavaPairRDD<Void, Node> rdd) throws Exception {
                try {

                    //Only Saving Diagnostic_Events - End */
                    logger.error("Filter the RDD into currentPartition");
                    // filter the RDD into two partitions
                    JavaPairRDD<Void, Node> currentPartition = rdd.filter(new Function<Tuple2<Void, Node>, Boolean>() {
                        @Override
                        public Boolean call(Tuple2<Void, Node> tuple) throws Exception {
                            if (tuple != null) {
                                Node nodeStats = tuple._2();
                                if (nodeStats != null && nodeStats.getCollectionTimestamp() != null) {
                                    long timestamp = nodeStats.getCollectionTimestamp()  * 1000;
                                    if (timestamp >= currHourTimestamp) {
                                        return true;
                                    }
                                }
                            }
                            return false;
                        }
                    });

                    if(processChildNodes || processAll) {
                        //logger.info("Processing Child Nodes");

                        for (ChildNodeConfig childNodeConfig : xmlStatsConfig.getChildNodesList().getChildNodes()) {
                            //logger.info("Testing: calling processSubRecordList for childNode-" + childNodeConfig.getName());
                            processSubRecordList(currentPartition, prevPartition, currentTime, properties,
                                    childNodeConfig.getName(), childNodeConfig.getNamespace(), childNodeConfig.getHdfsdir(), true);
                        }

                    }

                } catch (Exception ex) {
                    logger.error("Exception occurred during parquet saving", ex);
                }
            }
        });
    }
}

Исключение произошло в этой позиции при проверке! CurrentPartition.isEmpty ().

if (!currentPartition.isEmpty() && !currentPartition.partitions().isEmpty()) {
    logger.error("Saving the Child:: " + targetClass + " save current partition to " + tempPath);
    logger.info(targetClass + " save current partition to " + tempPath);
    currentPartition.saveAsNewAPIHadoopFile(tempPath + "/current",
            Void.class, clazz, AvroParquetOutputFormat.class, sparkJob.getConfiguration());
    logger.info(targetClass + " saved current partition to " + tempPath);
    // move the output parquet files into their respective partition
    UtilSpark2.moveTempParquetToDestHDFS(tempPath + "/current", hdfsOutputDir + currentDir, String.valueOf(currentTime), hdfs);
} else
    logger.info(targetClass + " Empty current partition.");
...