Avro схема Java проблема глубокого копирования с полевым порядком - PullRequest
1 голос
/ 19 мая 2019

В настоящее время я ищу решения для неожиданного поведения при работе с конкретными сценариями эволюции схемы AVRO при использовании Java и выполнении глубокой копии у потребителя для анализа класса GenericRecord в конкретный класс, который был сгенерирован из схемы AVRO.

Чтобы объяснить, что происходит, я буду использовать пример упрощенной схемы:

{
  "name":"SimpleEvent",
  "type":"record",
  "namespace":"com.simple.schemas",
  "fields":[
     {
        "name":"firstfield",
        "type":"string",
        "default":""
     },
     {
        "name":"secondfield",
        "type":"string",
        "default":""
     },
     {
        "name":"thirdfield",
        "type":"string",
        "default":""
     }
  ]
}

Это просто простая схема с тремя строковыми полями, все необязательные, поскольку они имеют значения по умолчанию.Предполагая, что в какой-то момент я хочу добавить другое строковое поле, а также удалить одно поле, поскольку оно больше не требуется, вы получите следующее:

{
  "name":"SimpleEvent",
  "type":"record",
  "namespace":"com.simple.schemas",
  "fields":[
     {
        "name":"firstfield",
        "type":"string",
        "default":""
     },
     {
        "name":"secondfield",
        "type":"string",
        "default":""
     },
     {
        "name":"newfield",
        "type":"string",
        "default":""
     }
  ]
}

Это не должно нарушать изменения в соответствии с правилами эволюции схемы,Однако, когда производитель начинает генерировать события с более новой схемой, у нижестоящих потребителей происходит нечто странное.

Оказывается, сгенерированные классы Java (я использовал плагин Gradle avro для генерации класса, но mavenгенерация кода командной строки в плагинах и инструментах avro дает один и тот же результат) только смотрит на порядок полей и не отображает поля на основе их имени.

Это означает, что значение поля "newfield" отображается на«третье поле» нижестоящими потребителями, которые используют более старую версию схемы для чтения данных.

Я нашел некоторую работу, в которой ручное отображение выполняется на основе имени, однако, чтоне работает для вложенных объектов.

В некоторых локальных экспериментах я также нашел другой подход, который правильно разрешает различия в схемах:

    Schema readerSchema = SimpleEvent.getClassSchema();
    Schema writerSchema = request.getSchema();

    if (readerSchema.equals(writerSchema)){
        return (SimpleEvent)SpecificData.get().deepCopy(writerSchema, request);
    }

    DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(writerSchema);
    BinaryEncoder encoder = null;
    ByteArrayOutputStream stream = new ByteArrayOutputStream();
    encoder = EncoderFactory.get().binaryEncoder(stream, encoder);

    writer.write(request, encoder);
    encoder.flush();

    byte[] recordBytes = stream.toByteArray();

    Decoder decoder = DecoderFactory.get().binaryDecoder(recordBytes, null);

    SpecificDatumReader<SimpleEvent> specificDatumReader = new SpecificDatumReader(writerSchema, readerSchema);
    SimpleEvent result = specificDatumReader.read(null, decoder);
    return result;

Однако этот подход выглядит довольно расточительным / неэлегатным, поскольку сначала нужно преобразоватьGenericRecord в byteArray и затем прочитайте его снова, используя SpecificDatumReader.

Разница между классами deepcopy и datumreader заключается в том, что классы datumReader подходят для сценариев, в которых схема записывающего устройства отличается от схемы считывателя.

Я считаю, что должно быть / может быть лучшеБолее элегантный способ справиться с этим.Я был бы очень признателен за любую помощь / советы, как туда добраться.

Заранее спасибо:)

Оскар

1 Ответ

0 голосов
/ 19 мая 2019

После дополнительных копаний и просмотра KafkaAvroDeserializer, который мы ранее использовали в наших слушателях, я заметил, что у AbstractKafkaAvroDeserializer есть функция для десериализации, где вы можете передать схему чтения.Это выглядело хорошо, чтобы быть правдой, но это работает!

package com.oskar.generic.consumer.demo;

import com.simple.schemas;

import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

public class SimpleEventDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object> {

private boolean isKey;

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
    this.isKey = isKey;
    configure(new KafkaAvroDeserializerConfig(configs));
}

@Override
public Object deserialize(String s, byte[] bytes) {
    return super.deserialize(bytes, SimpleEvent.getClassSchema());
}

@Override
public void close() {

}
}

Что затем используется на потребительской фабрике так:

@Bean
public ConsumerFactory<String, GenericRecord> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29095");
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "one");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SimpleEventDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(props);
}

И сам код слушателя выглядит так:

 @KafkaListener(topics = "my-topic")
public GenericRecord listen(@Payload GenericRecord request, @Headers MessageHeaders headers) throws IOException {
    SimpleEvent event = (SimpleEvent) SpecificData.get().deepCopy(request.getSchema(), request);
    return request;
}
...