Невозможно декодировать пользовательский объект на стороне клиента Avro в Кафке - PullRequest
0 голосов
/ 09 октября 2018

У меня есть конкретный класс, который я сериализирую в байтовом массиве для отправки в тему Кафки.Для сериализации я использую ReflectDatumWriter.Перед отправкой байтов [] я помещаю идентификатор схемы в первые 4 байта с идентификатором схемы после проверки некоторого онлайн-учебника.

Я могу отправить сообщение, но при его использовании в консоли Avro Consumer я получаю ответ как:

. / Bin / kafka-avro-console-customer --bootstrap-server 0: 9092 --property schema.stry.url = http://0:8081 --property print.key= true - Topic Test

"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000"
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000"

    MParams ddb = new MParams();
    ddb.setKey("ss");

    for (int i = 0; i < 10; i++) {
        ProducerRecord record = new ProducerRecord<String, byte[]>("Test", "1", build(1, Producer.serialize(ddb)));
        Future resp = kafkaFullAckProducer.send(record);

        System.out.println("Success" + resp.get());
    }
}

public static <T> byte[] serialize(T data) {
    Schema schema = null;
    if (data == null) {
        throw new RuntimeException("Data cannot be null in AvroByteSerializer");
    }
    try {
        schema = ReflectData.get().getSchema(data.getClass());
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
        writer.write(data, new EncoderFactory().directBinaryEncoder(out, null));
        byte[] bytes = out.toByteArray();
        return bytes;
    } catch (java.io.IOException e) {
        throw new RuntimeException("Error serializing Avro message", e);
    }
}

public static byte[] build(Integer schemaId, byte[] data) {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    out.write(0);
    try {
        out.write(ByteBuffer.allocate(4).putInt(schemaId).array());
        out.write(data);
        byte[] bytes = out.toByteArray();
        out.close();
        return bytes;
    } catch (IOException e) {
        throw new RuntimeException("Exception in avro record builder , msg :" + e.getMessage());
    }



@Data
public class MParams extends MetricParams{

    // POJO version

    @Nullable
    private String key;


}

@JsonTypeInfo(use = Id.CLASS, include = As.PROPERTY, property = "@c")
@Union(value= {MParams.class})
public abstract class MetricParams {

}

Рабочий фрагмент сериализатора

public byte[] serialize(String topic, T record) {
        Schema schema;
        int id;
        try {
            schema = ReflectData.get().getSchema(record.getClass());
            id = client.register(topic + "-value", schema);
        } catch (IOException | RestClientException e) {
            throw new RuntimeException(e);
        }
        return serializeImpl(id, schema, record);
    }

    protected byte[] serializeImpl(int id, Schema schema, T object) throws SerializationException {
        if (object == null) {
            return null;
        }
        try {
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            out.write(0x0);
            out.write(ByteBuffer.allocate(4).putInt(id).array());

            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
            DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
            writer.write(object, encoder);
            encoder.flush();
            byte[] bytes = out.toByteArray();
            out.close();
            return bytes;
        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error serializing Avro message", e);
        }
    }

Десериализатор:

protected T deserialize(Schema schema, byte[] payload) throws SerializationException {
        // Even if the caller requests schema & version, if the payload is null
        // cannot include it. The caller must handle
        // this case.
        if (payload == null) {
            return null;
        }

        int id = -1;
        try {
            ByteBuffer buffer = getByteBuffer(payload);
            id = buffer.getInt();
            int length = buffer.limit() - 1 - 4;

            int start = buffer.position() + buffer.arrayOffset();
            DatumReader<T> reader = new ReflectDatumReader<T>(schema);
            T res = reader.read(null, new DecoderFactory().binaryDecoder(buffer.array(), start, length, null));
            return res;
        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error deserializing Avro message for id " + id, e);
        }
    }

    private ByteBuffer getByteBuffer(byte[] payload) {
        ByteBuffer buffer = ByteBuffer.wrap(payload);
        if (buffer.get() != 0x0) {
            throw new SerializationException("Unknown magic byte!");
        }
        return buffer;
    }

1 Ответ

0 голосов
/ 09 октября 2018

Для сериализации я использую ReflectDatumWriter.Перед отправкой байтов [] я помещаю идентификатор схемы в первые 4 байта с идентификатором схемы

Не ясно, почему вы пытаетесь обойти поведение класса KafkaAvroSerializer по умолчанию .(В вашем случае удалите Schema.Parser из этого примера и используйте свой тип записи Reflect вместо GenericRecord)

Вы можете поместить ваш конкретный класс в качестве второго типа производителя и до тех пор, пока он реализуетбазовые классы Avro, он должен быть правильно сериализован (имеется в виду, что ID вычисляется правильно, а не какое-то число, которое вы создаете, и конвертируется в байты), регистрируется в реестре, а затем отправляется в Kafka

.не обязательно 1 в реестре, и, указав это, потребитель консоли может попытаться десериализовать ваши сообщения неправильно, что приведет к неправильному выводу

Другими словами, попробуйте

ProducerRecord<String, MParams> record = new ProducerRecord<>(...)
...