Я пишу схему сериализации для производителя Flink Kafka.Вот мой исходный код.
@SerialVersionUID(100L)
class KafkaSink private (dataStream: DataStream[Map[String, Any]], source: String) extends Logging with Serializable {
private[sink] lazy val kafkaSink = new FlinkKafkaProducer010[Map[String, Any]](
// broker list
ExecutionEnv.sinkTopic, // target topic
new KeyedSerializationSchema[Map[String, Any]] {
override def serializeKey(element: Map[String, Any]): Array[Byte] = KafkaSink.this
.keySerializer(element)
// {
// val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
// val oos = new ObjectOutputStream(stream)
// oos.writeObject(element("CMLS_ACCT_NUM"))
// oos.close()
// stream.toByteArray
// }
override def serializeValue(element: Map[String, Any]): Array[Byte] = {
val avroRecord: GenericData.Record = new GenericData.Record(schema)
for ((k, v) <- element) avroRecord.put(k, v)
val stream: Array[Byte] = recordInjection.apply(avroRecord)
stream
}
override def getTargetTopic(element: Map[String, Any]): String = ExecutionEnv.sinkTopic
}, ExecutionEnv.kafkaSinkProperties)
def keySerializer(element: Map[String, Any]) = {
val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(stream)
oos.writeObject(element("CMLS_ACCT_NUM"))
oos.close()
stream.toByteArray
}
private[sink] def valueSerializer(element: Map[String, Any]) = {
val avroRecord: GenericData.Record = new GenericData.Record(schema)
for ((k, v) <- element) avroRecord.put(k, v)
val stream: Array[Byte] = recordInjection.apply(avroRecord)
stream
}
def sendToKafka[T]: Unit = dataStream.addSink(kafkaSink)
}
object KafkaSink {
def apply(dataStream: DataStream[Map[String, Any]], source: String): KafkaSink = new KafkaSink(dataStream, source)
}
Почему я получаю сообщение об ошибке
org.apache.flink.api.common.InvalidProgramException: объект KafkaSink $$ anon $ 1 @ 736caf7a не сериализуем
, хотя я сделал класс KafkaSink сериализуемым.Эта программа работает, когда у меня анонимный класс KeyedSerializationSchema переопределяет keySerializer и valueSerializer в блоке кода, а не в методе из внешнего класса (KafkaSink).