Мне удалось придумать это:
def valueAvroDeserializer[A](schemaRegistryUrl: String, targetType: Class[A]): Deserializer[A] = {
val readerSchema = ReflectData.get().getSchema(targetType)
val idSize = 4
val deserializer = new AbstractKafkaAvroDeserializer with Deserializer[A] {
def configure(configs: util.Map[String, _], isKey: Boolean): Unit =
this.configure(new KafkaAvroDeserializerConfig(configs))
def deserialize(topic: String, data: Array[Byte]): A = {
val bytes = ByteBuffer.wrap(data)
bytes.get() // skip magic byte
val schemaId = bytes.getInt()
val writerSchema = schemaRegistry.getById(schemaId)
val length = bytes.limit() - 1 - idSize
val reader = new ReflectDatumReader[A](writerSchema, readerSchema)
val decoder = DecoderFactory.get().binaryDecoder(bytes.array(), bytes.position(), length, null)
reader.read(null.asInstanceOf[A], decoder)
}
def close(): Unit = {}
}
val props = Map("schema.registry.url" -> schemaRegistryUrl)
deserializer.configure(props.asJava, false)
deserializer
}