Различение типа объединения AVRO - PullRequest
3 голосов
/ 21 января 2020

Я использую сериализованные сообщения Avro от Kafka, используя десериализатор "automati c", такой как:

props.put(
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "io.confluent.kafka.serializers.KafkaAvroDeserializer"
);
props.put("schema.registry.url", "https://example.com");

Это прекрасно работает, и прямо из документов на https://docs.confluent.io/current/schema-registry/serializer-formatter.html#serializer .

Проблема, с которой я сталкиваюсь, заключается в том, что я просто хочу пересылать эти сообщения, но для маршрутизации мне нужны метаданные изнутри. Некоторые технические ограничения означают, что я не могу реально скомпилировать сгенерированные файлы классов, чтобы использовать KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG => true, поэтому я использую обычный декодер, не привязываясь к Kafka, а именно просто читая байты как Array[Byte] и передавая их в десериализатор, созданный вручную:

var maxSchemasToCache = 1000;
var schemaRegistryURL = "https://example.com/"
var specificDeserializerProps = Map(
  "schema.registry.url" 
      -> schemaRegistryURL,
  KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG 
      -> "false"
);
var client = new CachedSchemaRegistryClient(
                     schemaRegistryURL, 
                     maxSchemasToCache
                 );
var deserializer = new KafkaAvroDeserializer(
                         client,
                         specificDeserializerProps.asJava
                   );

Сообщения имеют тип «контейнер», с действительно интересной частью одного из ~ 25 типов в поле записи union { A, B, C } msg:

record Event {
    timestamp_ms created_at;
    union {
        Online,
        Offline,
        Available,
        Unavailable,
        ...
        ...Failed,
        ...Updated
    } msg;
}

Итак, я успешно читаю Array[Byte] в record и подаю его в десериализатор следующим образом:

var genericRecord = deserializer.deserialize(topic, consumerRecord.value())
                       .asInstanceOf[GenericRecord];
var schema = genericRecord.getSchema();
var msgSchema = schema.getField("msg").schema();

Однако проблема в том, что я не могу найти ничего, чтобы различить, различить или «разрешить» «тип» поля msg через объединение:

System.out.printf(
    "msg.schema = %s msg.schema.getType = %s\n", 
    msgSchema.getFullName(),  
    msgSchema.getType().name());
=> msg.schema = union msg.schema.getType = union

Как различать типы в этом сценарии? Слитный реестр знает, что у этих вещей есть имена, у них есть «типы», даже если я отношусь к ним как GenericRecords,

Моя цель здесь - узнать, что record.msg имеет «тип» Online | Offline | Available, а не просто зная, что это union.

Ответы [ 3 ]

1 голос
/ 21 января 2020

Я смог придумать одноразовое решение после большого количества копаний:

val records: ConsumerRecords[String, Array[Byte]] = consumer.poll(100);
for (consumerRecord <- asScalaIterator(records.iterator)) {
  var genericRecord = deserializer.deserialize(topic, consumerRecord.value()).asInstanceOf[GenericRecord];
  var msgSchema = genericRecord.get("msg").asInstanceOf[GenericRecord].getSchema();
  System.out.printf("%s \n", msgSchema.getFullName());

Отпечатки com.myorg.SomeSchemaFromTheEnum и отлично работает в моем случае использования.

сбивает с толку то, что из-за использования GenericRecord, .get("msg") возвращает Object, что, в общем-то, у меня нет возможности безопасно переписать. В этом ограниченном случае я знаю, что приведение является безопасным.

В моем ограниченном сценарии использования решение в 5 строках выше подходит, но для более общего решения ответ { ссылка } опубликовано { ссылка } кажется более уместным.

Использование DatumReader или GenericRecord, вероятно, является вопросом предпочтения и имеет ли в виду экосистему Кафки, наедине с Avro I Я, вероятно, предпочел бы решение DatumReader, но в этом случае я могу жить с наличием в своем коде номенклатуры типа Кафак.

1 голос
/ 21 апреля 2020

Чтобы получить схему значения поля, вы можете использовать

new GenericData().induce(genericRecord.get("msg"))
1 голос
/ 21 января 2020

Изучив реализацию библиотеки AVRO Java, можно с уверенностью сказать, что это невозможно, учитывая текущий API. Я нашел следующий способ извлечения типов при синтаксическом анализе с использованием пользовательского подкласса GenericDatumReader, но он требует много полировки, прежде чем я буду использовать что-то подобное в производственном коде: D

Итак, вот подкласс:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.ResolvingDecoder;

import java.io.IOException;
import java.util.List;

public class CustomReader<D> extends GenericDatumReader<D> {
    private final GenericData data;
    private Schema actual;
    private Schema expected;

    private ResolvingDecoder creatorResolver = null;
    private final Thread creator;
    private List<Schema> unionTypes;

    // vvv This is the constructor I've modified, added a list of types
    public CustomReader(Schema schema, List<Schema> unionTypes) {
        this(schema, schema, GenericData.get());
        this.unionTypes = unionTypes;
    }

    public CustomReader(Schema writer, Schema reader, GenericData data) {
        this(data);
        this.actual = writer;
        this.expected = reader;
    }

    protected CustomReader(GenericData data) {
        this.data = data;
        this.creator = Thread.currentThread();
    }

    protected Object readWithoutConversion(Object old, Schema expected, ResolvingDecoder in) throws IOException {
        switch (expected.getType()) {
            case RECORD:
                return super.readRecord(old, expected, in);
            case ENUM:
                return super.readEnum(expected, in);
            case ARRAY:
                return super.readArray(old, expected, in);
            case MAP:
                return super.readMap(old, expected, in);
            case UNION:
                // vvv The magic happens here
                Schema type = expected.getTypes().get(in.readIndex());
                unionTypes.add(type);
                return super.read(old, type, in);
            case FIXED:
                return super.readFixed(old, expected, in);
            case STRING:
                return super.readString(old, expected, in);
            case BYTES:
                return super.readBytes(old, expected, in);
            case INT:
                return super.readInt(old, expected, in);
            case LONG:
                return in.readLong();
            case FLOAT:
                return in.readFloat();
            case DOUBLE:
                return in.readDouble();
            case BOOLEAN:
                return in.readBoolean();
            case NULL:
                in.readNull();
                return null;
            default:
                return super.readWithoutConversion(old, expected, in);
        }
    }
}

Я добавил комментарии к коду для интересных частей, так как это в основном шаблон.

Затем вы можете использовать этот пользовательский ридер следующим образом:

        List<Schema> unionTypes = new ArrayList<>();
        DatumReader<GenericRecord> datumReader = new CustomReader<GenericRecord>(schema, unionTypes);
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(eventFile, datumReader);
        GenericRecord event = null;

        while (dataFileReader.hasNext()) {
            event = dataFileReader.next(event);
        }

        System.out.println(unionTypes);

Для каждого проанализированного union будет напечатан тип этого union. Обратите внимание, что вам придется выяснить, какой элемент этого списка вам интересен, в зависимости от того, сколько союзов у ​​вас есть в записи, и т. Д. c.

Не очень красиво: D

...