Avro GenericRecord для вложенных POJO - PullRequest
1 голос
/ 06 января 2020

Есть ли способ десериализации GenericRecord (который я только что получил из сообщения Kafka) во вложенный POJO? На самом деле я пытаюсь десериализовать его в класс Scala, но я понимаю, что это еще сложнее. Я искал через inte rnet и кажется, что все делали это вручную. Вам известна какая-нибудь библиотека, способная сделать это?

Ответы [ 2 ]

0 голосов
/ 06 января 2020

Мне удалось придумать это:

  def valueAvroDeserializer[A](schemaRegistryUrl: String, targetType: Class[A]): Deserializer[A] = {
val readerSchema = ReflectData.get().getSchema(targetType)
val idSize = 4


val deserializer = new AbstractKafkaAvroDeserializer with Deserializer[A] {
    def configure(configs: util.Map[String, _], isKey: Boolean): Unit =
      this.configure(new KafkaAvroDeserializerConfig(configs))

  def deserialize(topic: String, data: Array[Byte]): A = {
      val bytes = ByteBuffer.wrap(data)
      bytes.get() // skip magic byte
      val schemaId = bytes.getInt()
      val writerSchema = schemaRegistry.getById(schemaId)
      val length = bytes.limit() - 1 - idSize
      val reader = new ReflectDatumReader[A](writerSchema, readerSchema)
      val decoder = DecoderFactory.get().binaryDecoder(bytes.array(), bytes.position(), length, null)
      reader.read(null.asInstanceOf[A], decoder)
    }

  def close(): Unit = {}
}
val props = Map("schema.registry.url" -> schemaRegistryUrl)
deserializer.configure(props.asJava, false)
deserializer

}

0 голосов
/ 06 января 2020

Существует довольно обобщенный c код c деривационное решение для аппликативных схем:

https://github.com/danslapman/morphling

Это не обеспечивает "импорт и использование" "Решение, но оно предоставляет способ написать собственный механизм деривации кода c для вашего протокола, не путаясь с бесформенным / магнолией.

Также, если вам нужно иметь дело с двоичными данными, попробуйте:

https://github.com/scodec/scodec

Это обеспечивает довольно scala -путь решения таких проблемы.

...