Я хотел бы попросить совета по следующей проблеме. Я пытаюсь научиться выполнять сериализацию данных Avro с использованием nodejs без реестра схемы, опубликовать sh в кластере Kafka и затем извлечь его в потоках Kafka (Java).
На стороне javascript я попытался использовать kafka-node вместе с avs c для сериализации. В Kafka Streams я решил реализовать пользовательский Serde, поскольку, насколько я понимаю, Avro Serdes, предоставляемые Streams API, предназначены для получения схем непосредственно из реестра схем.
Вот код javascript Фрагмент простого производителя:
const avro = require('avsc');
const messageKey = "1";
const schemaType = avro.Type.forSchema({
type: "record",
name: "product",
fields: [
{
name: "id",
type: "int"
},
{
name: "name",
type: "string"
},
{
name: "price",
type: "double"
},
{
name: "stock",
type: "int"
}
]
});
const messageValueBuffer = schemaType.toBuffer({id, name, stock, price});
const payload = [{topic: 'product', key: messageKey, messages: messageValueBuffer, partition: 0}];
producer.send(payload, sendCallback);
А вот как я сейчас пытаюсь реализовать десериализатор:
public Product deserialize(String topic, byte[] data) {
SeekableByteArrayInput inputstream = new SeekableByteArrayInput(data);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader;
Product product = null;
try {
dataFileReader = new DataFileReader<GenericRecord>(inputstream, datumReader);
GenericRecord record = new GenericData.Record(schema);
while(dataFileReader.hasNext()) {
dataFileReader.next();
product = genericRecordToObject(record, new Product());
}
} catch (IOException e) {
e.printStackTrace();
}
return product;
}
Однако, когда потоковое приложение пытается десериализовать данные, я столкнулся со следующей ошибкой, особенно в строке кода, где создается экземпляр DataFileReader:
org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:111)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:106)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:98)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:138)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:128)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:1)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:168)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:109)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:156)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:808)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:925)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:763)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Я не знаю, как действовать дальше. Любой совет будет оценен.