Nodejs Авро сериализация без реестра схемы с последующей десериализацией в Kafka Streams - PullRequest
0 голосов
/ 06 марта 2020

Я хотел бы попросить совета по следующей проблеме. Я пытаюсь научиться выполнять сериализацию данных Avro с использованием nodejs без реестра схемы, опубликовать sh в кластере Kafka и затем извлечь его в потоках Kafka (Java).

На стороне javascript я попытался использовать kafka-node вместе с avs c для сериализации. В Kafka Streams я решил реализовать пользовательский Serde, поскольку, насколько я понимаю, Avro Serdes, предоставляемые Streams API, предназначены для получения схем непосредственно из реестра схем.

Вот код javascript Фрагмент простого производителя:

const avro = require('avsc');

const messageKey = "1";

const schemaType = avro.Type.forSchema({
    type: "record",
    name: "product",
    fields: [
        {
        name: "id",
        type: "int"
        },
        {
        name: "name",
        type: "string"
        },
        {
        name: "price",
        type: "double"
        },
        {
        name: "stock",
        type: "int"
        }
    ]
});

const messageValueBuffer = schemaType.toBuffer({id, name, stock, price});
const payload = [{topic: 'product', key: messageKey, messages: messageValueBuffer, partition: 0}];
producer.send(payload, sendCallback);

А вот как я сейчас пытаюсь реализовать десериализатор:

public Product deserialize(String topic, byte[] data) {
    SeekableByteArrayInput inputstream =  new SeekableByteArrayInput(data);

    DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);

    DataFileReader<GenericRecord> dataFileReader;

    Product product = null;
    try {
        dataFileReader = new DataFileReader<GenericRecord>(inputstream, datumReader);
        GenericRecord record = new GenericData.Record(schema);

        while(dataFileReader.hasNext()) {
            dataFileReader.next();
            product = genericRecordToObject(record, new Product());
        }

    } catch (IOException e) {
        e.printStackTrace();

    }   
    return product;
}

Однако, когда потоковое приложение пытается десериализовать данные, я столкнулся со следующей ошибкой, особенно в строке кода, где создается экземпляр DataFileReader:

org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
    at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:111)
    at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:106)
    at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:98)
    at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:138)
    at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:128)
    at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:1)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:168)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:109)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:156)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:808)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:925)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:763)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)

Я не знаю, как действовать дальше. Любой совет будет оценен.

1 Ответ

1 голос
/ 08 марта 2020

Может быть, я ошибаюсь, но я думаю, что вы не должны использовать DataFileReader, только DatumReader.

Я сделал нечто подобное в kafka (не в Kafka Streams), возможно, может дать вам несколько идей:

Полный пример (очень простой) приведен здесь: https://github.com/anigmo97/KafkaRecipes/blob/master/java/consumers/StringKeyAvroValueConsumers/StandardAvro/StandardAvroConsumer.java

Как видите, я не создал Serializer, я десериализовал значение и получил Generi c Запись.

public static void main(String[] args) {
        final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(getConsumerProperties());
        consumer.subscribe(Collections.singleton(TOPIC));
        ConsumerRecords<String, byte[]> consumerRecords;
        String valueSchemaString = "{\"type\": \"record\",\"namespace\": \"example.avro\",\"name\": \"test_record\","
                + "\"fields\":[" + "{\"name\": \"id\",\"type\": \"int\"},"
                + "{\"name\": \"date\",\"type\": [\"int\", \"null\"]}," + "{\"name\": \"info\",\"type\": \"string\"}"
                + "]}}";
        Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
        SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroValueSchema);
        try {
            while (true) {
                consumerRecords = consumer.poll(1000);

                consumerRecords.forEach(record -> {
                    ByteArrayInputStream inputStream = new ByteArrayInputStream(record.value());
                    BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null);
                    GenericRecord deserializedValue = null;
                    try {
                        deserializedValue = datumReader.read(null, binaryDecoder);
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    System.out.printf("Consumer Record:(%s, %s)\n", record.key(), deserializedValue);
                });

                consumer.commitAsync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
            System.out.println("DONE");
        }

    }

Надеюсь, это поможет.

...