Я использую сериализованные сообщения Avro от Kafka, используя десериализатор "automati c", такой как:
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroDeserializer"
);
props.put("schema.registry.url", "https://example.com");
Это прекрасно работает, и прямо из документов на https://docs.confluent.io/current/schema-registry/serializer-formatter.html#serializer .
Проблема, с которой я сталкиваюсь, заключается в том, что я просто хочу пересылать эти сообщения, но для маршрутизации мне нужны метаданные изнутри. Некоторые технические ограничения означают, что я не могу реально скомпилировать сгенерированные файлы классов, чтобы использовать KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG => true
, поэтому я использую обычный декодер, не привязываясь к Kafka, а именно просто читая байты как Array[Byte]
и передавая их в десериализатор, созданный вручную:
var maxSchemasToCache = 1000;
var schemaRegistryURL = "https://example.com/"
var specificDeserializerProps = Map(
"schema.registry.url"
-> schemaRegistryURL,
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG
-> "false"
);
var client = new CachedSchemaRegistryClient(
schemaRegistryURL,
maxSchemasToCache
);
var deserializer = new KafkaAvroDeserializer(
client,
specificDeserializerProps.asJava
);
Сообщения имеют тип «контейнер», с действительно интересной частью одного из ~ 25 типов в поле записи union { A, B, C } msg
:
record Event {
timestamp_ms created_at;
union {
Online,
Offline,
Available,
Unavailable,
...
...Failed,
...Updated
} msg;
}
Итак, я успешно читаю Array[Byte]
в record
и подаю его в десериализатор следующим образом:
var genericRecord = deserializer.deserialize(topic, consumerRecord.value())
.asInstanceOf[GenericRecord];
var schema = genericRecord.getSchema();
var msgSchema = schema.getField("msg").schema();
Однако проблема в том, что я не могу найти ничего, чтобы различить, различить или «разрешить» «тип» поля msg
через объединение:
System.out.printf(
"msg.schema = %s msg.schema.getType = %s\n",
msgSchema.getFullName(),
msgSchema.getType().name());
=> msg.schema = union msg.schema.getType = union
Как различать типы в этом сценарии? Слитный реестр знает, что у этих вещей есть имена, у них есть «типы», даже если я отношусь к ним как GenericRecords
,
Моя цель здесь - узнать, что record.msg
имеет «тип» Online | Offline | Available
, а не просто зная, что это union
.