java.io.NotSerializableException в Spark - PullRequest
       25

java.io.NotSerializableException в Spark

0 голосов
/ 30 октября 2018

Ниже приведена часть моей искровой работы:

def parse(evt: Event): String = {
  try {
    val config = new java.util.HashMap[java.lang.String, AnyRef] // Line1
    config.put("key", "value") // Line2
    val decoder = new DeserializerHelper(config, classOf[GenericRecord]) // Line3
    val payload = decoder.deserializeData(evt.getId, evt.toBytes)
    val record = payload.get("data")
    record.toString
  } catch {
    case e :Exception => "exception:" + e.toString
  }
}

try {
  val inputStream = KafkaUtils.createDirectStream(
    ssc,
    PreferConsistent,
    Subscribe[String, String](Array(inputTopic), kafkaParams)
  )
  val processedStream = inputStream.map(record => parse(record.value()))
  processedStream.print()
} finally {
}

Если я переместил LINE1-LINE3 в вышеприведенных кодах за пределы функции parse(), я получил

Caused by: java.io.NotSerializableException: SchemaDeserializerHelper
Serialization stack:
    - object not serializable (class: SchemaDeserializerHelper, value: SchemaDeserializerHelper@2e23c180)
    - field (class: App$$anonfun$1, name: decoder$1, type: class SchemaDeserializerHelper)
    - object (class App$$anonfun$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
    ... 22 more

Почему? Я не люблю помещать Line1 ~ Line3 в функцию parse(), как это оптимизировать?

Спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...