Крио Сериализация выпуск Spark - PullRequest
0 голосов
/ 05 октября 2018

Я пытаюсь зарегистрироваться ниже класса для сериализации Kryo в коде искры, но я получаю сообщение об ошибке.

Код:

class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
  lazy val producer = createProducer()
  def send(topic: String, key: String, value: String): Unit = producer.send(new ProducerRecord(topic, key, value))
}

object KafkaSink {
  def apply(config: Properties): KafkaSink = {
    val f = () => {
      val producer = new KafkaProducer[String, String](config)

      //close producer if VM exits
      sys.addShutdownHook {
        producer.close()
      }
      producer
    }
    new KafkaSink(f)
  }
}

Ошибка:

Причина: java.lang.IllegalArgumentException: класс не зарегистрирован: com.test.KafkaSink $$ anonfun $ 1 Примечание: Для регистрации этого класса используйте: kryo.register (com.test.KafkaSink $$ anonfun $ 1.class);на com.esotericsoftware.kryo.Kryo.getRegistration (Kryo.java:488) на com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass (DefaultClassResolver.java:97) на com.esotericsoftware.kryo.Cryowrite: 517) at com.esotericsoftware.kryo.serializers.ObjectField.write (ObjectField.java:76) ... еще 14

Я попытался зарегистрировать класс, используя следующие 2 подхода, которые неработать и дает мне ту же ошибку

kryo.register(classOf[KafkaSink])
kryo.register(KafkaSink.getClass)

Как я могу зарегистрировать этот класс?

...