Ниже приведена часть моей искровой работы:
def parse(evt: Event): String = {
try {
val config = new java.util.HashMap[java.lang.String, AnyRef] // Line1
config.put("key", "value") // Line2
val decoder = new DeserializerHelper(config, classOf[GenericRecord]) // Line3
val payload = decoder.deserializeData(evt.getId, evt.toBytes)
val record = payload.get("data")
record.toString
} catch {
case e :Exception => "exception:" + e.toString
}
}
try {
val inputStream = KafkaUtils.createDirectStream(
ssc,
PreferConsistent,
Subscribe[String, String](Array(inputTopic), kafkaParams)
)
val processedStream = inputStream.map(record => parse(record.value()))
processedStream.print()
} finally {
}
Если я переместил LINE1-LINE3 в вышеприведенных кодах за пределы функции parse()
, я получил
Caused by: java.io.NotSerializableException: SchemaDeserializerHelper
Serialization stack:
- object not serializable (class: SchemaDeserializerHelper, value: SchemaDeserializerHelper@2e23c180)
- field (class: App$$anonfun$1, name: decoder$1, type: class SchemaDeserializerHelper)
- object (class App$$anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
... 22 more
Почему? Я не люблю помещать Line1 ~ Line3 в функцию parse()
, как это оптимизировать?
Спасибо