Я пытаюсь зарегистрироваться ниже класса для сериализации Kryo в коде искры, но я получаю сообщение об ошибке.
Код:
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, key: String, value: String): Unit = producer.send(new ProducerRecord(topic, key, value))
}
object KafkaSink {
def apply(config: Properties): KafkaSink = {
val f = () => {
val producer = new KafkaProducer[String, String](config)
//close producer if VM exits
sys.addShutdownHook {
producer.close()
}
producer
}
new KafkaSink(f)
}
}
Ошибка:
Причина: java.lang.IllegalArgumentException: класс не зарегистрирован: com.test.KafkaSink $$ anonfun $ 1 Примечание: Для регистрации этого класса используйте: kryo.register (com.test.KafkaSink $$ anonfun $ 1.class);на com.esotericsoftware.kryo.Kryo.getRegistration (Kryo.java:488) на com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass (DefaultClassResolver.java:97) на com.esotericsoftware.kryo.Cryowrite: 517) at com.esotericsoftware.kryo.serializers.ObjectField.write (ObjectField.java:76) ... еще 14
Я попытался зарегистрировать класс, используя следующие 2 подхода, которые неработать и дает мне ту же ошибку
kryo.register(classOf[KafkaSink])
kryo.register(KafkaSink.getClass)
Как я могу зарегистрировать этот класс?