Avro не удается десериализовать сообщение с обновленной схемой - PullRequest
0 голосов
/ 20 декабря 2018

У меня есть схема, которая была обновлена, чтобы включить новое поле.Для десериализации / сериализации данных я использую Avro Reflection и системный реестр схем, например:

Сериализация:

Schema schema = REFLECT_DATA.getSchema(value.getClass());
try {
    int registeredSchemaId = this.schemaRegistry.register(subject, schema);

    ByteArrayOutputStream out = new ByteArrayOutputStream();
    out.write(0);
    out.write(ByteBuffer.allocate(4).putInt(registeredSchemaId).array());

    DatumWriter<Object> dw = new ReflectDatumWriter<>(schema);
    Encoder encoder = ENCODER_FACTORY.directBinaryEncoder(out, null);
    dw.write(value, encoder);
    encoder.flush();
    return out.toByteArray();
} catch (RuntimeException | IOException e) {
    throw new SerializationException("Error serializing Avro message", e);
} catch (RestClientException e) {
    throw new SerializationException("Error registering Avro schema: " + schema, e);
}

Десериализация:

if (readerSchema == null) {
    readerSchema = new Schema.Parser().parse(schemaString);
}

int schemaId = -1;
try {
    ByteBuffer buffer = ByteBuffer.wrap(payload);
    if (buffer.get() != MAGIC_BYTE) {
        throw new SerializationException("Unknown magic byte!");
    }

    schemaId = buffer.getInt();
    Schema writerSchema = schemaRegistry.getById(schemaId);

    int start = buffer.position() + buffer.arrayOffset();
    int length = buffer.limit() - 1 - idSize;
    DatumReader<Object> reader = new ReflectDatumReader<>(writerSchema, readerSchema);
    BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
    return reader.read(null, decoder);  //line 83
} catch (IOException e) {
    throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
} catch (RestClientException e) {
    throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
}

Схемаопределенный классом случая scala, старый выглядит следующим образом:

case class Data(oldField: String) {
    def this("")
}

, и он был обновлен следующим образом:

case class Data(oldField: String, @AvroDefault("") newField: String) {
    def this("", "")
}

Однако десериализация иногда вызывает исключение AvroTypeException со следующимstack:

Caused by: org.apache.avro.AvroTypeException: Found com.company.project.DataHolder$.Data, expecting com.company.project.DataHolder$.Data
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:173)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
    at io.fama.pubsub.KafkaAvroReflectDeserializer.deserialize(KafkaAvroReflectDeserializer.java:83)

Что, я думаю, вызвано трудностями сериализации старых сообщений (но я не совсем уверен - я просто не могу понять, что еще это может быть).Кто-нибудь еще сталкивался с этой ошибкой или у кого-нибудь есть идеи, чтобы исправить ее?

1 Ответ

0 голосов
/ 22 декабря 2018

Если вы используете атрибуты org.apache.avro.reflect, то я не думаю, что вы можете использовать классы случаев Scala - параметры класса случаев Scala являются неизменяемыми, и я считаю, что для отображения атрибутов потребуется классс открытым пустым конструктором и видимыми полями Java, возможно, даже @BeanProperty для генерации установщиков / получателей Java.

...