Flink: org.apache.flink.api.common.InvalidProgramException Объект KafkaSink $$ anon $ 1 @ 58f174d9 не сериализуем - PullRequest
0 голосов
/ 23 мая 2019

Я пишу схему сериализации для производителя Flink Kafka.Вот мой исходный код.

@SerialVersionUID(100L)
class KafkaSink private (dataStream: DataStream[Map[String, Any]], source: String) extends Logging with Serializable {

  private[sink] lazy val kafkaSink = new FlinkKafkaProducer010[Map[String, Any]](
    // broker list
    ExecutionEnv.sinkTopic, // target topic
    new KeyedSerializationSchema[Map[String, Any]] {
      override def serializeKey(element: Map[String, Any]): Array[Byte] = KafkaSink.this
        .keySerializer(element)
      //      {
      //        val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
      //        val oos = new ObjectOutputStream(stream)
      //        oos.writeObject(element("CMLS_ACCT_NUM"))
      //        oos.close()
      //        stream.toByteArray
      //      }

      override def serializeValue(element: Map[String, Any]): Array[Byte] = {
        val avroRecord: GenericData.Record = new GenericData.Record(schema)
        for ((k, v) <- element) avroRecord.put(k, v)
        val stream: Array[Byte] = recordInjection.apply(avroRecord)
        stream
      }

      override def getTargetTopic(element: Map[String, Any]): String = ExecutionEnv.sinkTopic
    }, ExecutionEnv.kafkaSinkProperties)

  def keySerializer(element: Map[String, Any]) = {

    val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(stream)
    oos.writeObject(element("CMLS_ACCT_NUM"))
    oos.close()
    stream.toByteArray
  }

  private[sink] def valueSerializer(element: Map[String, Any]) = {

    val avroRecord: GenericData.Record = new GenericData.Record(schema)
    for ((k, v) <- element) avroRecord.put(k, v)
    val stream: Array[Byte] = recordInjection.apply(avroRecord)
    stream
  }

  def sendToKafka[T]: Unit = dataStream.addSink(kafkaSink)

}

object KafkaSink {
  def apply(dataStream: DataStream[Map[String, Any]], source: String): KafkaSink = new KafkaSink(dataStream, source)
}

Почему я получаю сообщение об ошибке

org.apache.flink.api.common.InvalidProgramException: объект KafkaSink $$ anon $ 1 @ 736caf7a не сериализуем

, хотя я сделал класс KafkaSink сериализуемым.Эта программа работает, когда у меня анонимный класс KeyedSerializationSchema переопределяет keySerializer и valueSerializer в блоке кода, а не в методе из внешнего класса (KafkaSink).

...