Я получил эту ошибку, когда использовал свои функции UDF в своем приложении spark (write by java).
org.apache.spark.SparkException: задача не сериализуется в org.apache.spark.util.ClosureCleaner $.ensureSerializable (ClosureCleaner.scala: 403) ... Причина: java.io.NotSerializableException: jp.co.nec.necdas.commons.customize.service.dataset.ALMTriggerProcessLogic Стек сериализации: - объект не сериализуем (класс: jp. co.nec.necdas.commons.customize.service.dataset.ALMTriggerProcessLogic, значение: jp.co.nec.necdas.commons.customize.service.dataset.ALMTriggerProcessLogic@f237ae7)
И это мой код:
//"alarmMeasure" is a dataset from postgres
//"macroInfo" is also a dataset,but from csv
alarmMeasure.sparkSession().sqlContext().udf().register("genrateKeyId", new UDF2<String,String,String>() {
@Override
public String call(String almDetectionCode,String time) throws Exception {
StringBuilder keyId = new StringBuilder();
time = DateTimeUtils.transform(time,"yyyy-MM-dd hh:mm:ss","yyyyMMddhhmm");
keyId.append("KNLG");
keyId.append("_");
keyId.append(almDetectionCode);
keyId.append("_");
keyId.append(time);
return keyId.toString();
}
}, DataTypes.StringType);
Dataset tmp = alarmMeasure.join(macroInfo,alarmMeasure.col("deviceName")
.equalTo(macroInfo.col("deviceName")),"inner")
.drop(macroInfo.col("deviceName"));
tmp.withColumn("KeyId",functions.callUDF("genrateKeyId",tmp.col("alarmDectionCode"),tmp.col("alarmDectionCode").show();
Некоторые блоги говорили мне, что я должен реализовать java.io.Serializable, поэтому я попытался, но получил такую же ошибку, как это:
Причина: java.io.NotSerializableException: jp. co.nec.necdas.commons.spark.SparkContextManager Стек сериализации: - объект не сериализуем (класс: jp.co.nec.necdas.commons.spark.SparkContextManager, значение: jp.co.nec.necdas.commons.spark.SparkContextManager @ 153cfd86)
«SparkContextManager» - это API, используемый в моем классе, означает ли это, что я должен гарантировать, что весь класс, используемый в моем классе, реализует java.io.Serializable?