Класс DiagnosticEvent является классом, генерирующим Avro, и он также имеет serialVersionUID.
20/01/05 09:56:09 ERROR nodeStatsConfigDriven.NodeStatsKafkaProcessor: DiagnosticEvent Exception occurred during parquet saving
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2289)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2063)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1462)
at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)
at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1461)
at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
at Spark2ParquetEngine.nodeStatsConfigDriven.NodeStatsKafkaProcessor.processSubRecordList(NodeStatsKafkaProcessor.java:463)
at Spark2ParquetEngine.nodeStatsConfigDriven.NodeStatsKafkaProcessor.access$100(NodeStatsKafkaProcessor.java:42)
at Spark2ParquetEngine.nodeStatsConfigDriven.NodeStatsKafkaProcessor$4.call(NodeStatsKafkaProcessor.java:252)
at Spark2ParquetEngine.nodeStatsConfigDriven.NodeStatsKafkaProcessor$4.call(NodeStatsKafkaProcessor.java:273)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: com.servicenow.bigdata.schema.nodeStats.DiagnosticEvent
Serialization stack:
- object not serializable (class: com.servicenow.bigdata.schema.nodeStats.DiagnosticEvent, value: {"schema_version": 2, etc...})
- writeObject data (class: java.util.ArrayList)
- object (class java.util.ArrayList, [{"schema_version": 2, etc...}])
- writeObject data (class: java.util.HashMap)
- object (class java.util.HashMap, {d12e3671478a0602f82a17c94c88155a=[{"schema_version": 2, etc}]
Я искал эту проблему и пытался всеми возможными способами, как указано в сообщениях. Но не смог решить проблему в моем потоковом приложении. Пожалуйста, помогите исправить эту проблему в потоковом приложении, сохраняя данные.
Код ::
Исключением является точка для nodeStats.foreachRDD
for(ChildNodeConfig childNodeConfig : xmlStatsConfig.getChildNodesList().getChildNodes()){
if(childNodeConfig.isUseStatefulCacheFilter() && statefulFiltersMap.containsKey(childNodeConfig.getNodeFilterConfig().getName())){
//logger.info("Applying stateful filter : "+childNodeConfig.getNodeFilterConfig().getName());
JavaPairDStream<Void, Node> nodeStats = statefulFiltersMap.get(childNodeConfig.getNodeFilterConfig().getName()).applyStatefulFilter(nodeStats,intialStatefulRDDs.get(childNodeConfig.getNodeFilterConfig().getName()),hiveContext);
nodeStats.foreachRDD(new VoidFunction<JavaPairRDD<Void,Node>>() {
@Override
public void call(JavaPairRDD<Void, Node> rdd) throws Exception {
try {
//Only Saving Diagnostic_Events - End */
logger.error("Filter the RDD into currentPartition");
// filter the RDD into two partitions
JavaPairRDD<Void, Node> currentPartition = rdd.filter(new Function<Tuple2<Void, Node>, Boolean>() {
@Override
public Boolean call(Tuple2<Void, Node> tuple) throws Exception {
if (tuple != null) {
Node nodeStats = tuple._2();
if (nodeStats != null && nodeStats.getCollectionTimestamp() != null) {
long timestamp = nodeStats.getCollectionTimestamp() * 1000;
if (timestamp >= currHourTimestamp) {
return true;
}
}
}
return false;
}
});
if(processChildNodes || processAll) {
//logger.info("Processing Child Nodes");
for (ChildNodeConfig childNodeConfig : xmlStatsConfig.getChildNodesList().getChildNodes()) {
//logger.info("Testing: calling processSubRecordList for childNode-" + childNodeConfig.getName());
processSubRecordList(currentPartition, prevPartition, currentTime, properties,
childNodeConfig.getName(), childNodeConfig.getNamespace(), childNodeConfig.getHdfsdir(), true);
}
}
} catch (Exception ex) {
logger.error("Exception occurred during parquet saving", ex);
}
}
});
}
}
Исключение произошло в этой позиции при проверке! CurrentPartition.isEmpty ().
if (!currentPartition.isEmpty() && !currentPartition.partitions().isEmpty()) {
logger.error("Saving the Child:: " + targetClass + " save current partition to " + tempPath);
logger.info(targetClass + " save current partition to " + tempPath);
currentPartition.saveAsNewAPIHadoopFile(tempPath + "/current",
Void.class, clazz, AvroParquetOutputFormat.class, sparkJob.getConfiguration());
logger.info(targetClass + " saved current partition to " + tempPath);
// move the output parquet files into their respective partition
UtilSpark2.moveTempParquetToDestHDFS(tempPath + "/current", hdfsOutputDir + currentDir, String.valueOf(currentTime), hdfs);
} else
logger.info(targetClass + " Empty current partition.");