Я пытаюсь отправить данные через поток Kinesis в формате Avro.Но я столкнулся с проблемой - IllegalArgumentException возникает все время, вот мой код:
user.avsc
{
"doc": "User schema",
"namespace": "avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"}
]
}
Производитель:
User user = User.newBuilder().setName("Igor").build()
byte[] bytes = toAvro(user);
kinesis.putRecord(
new PutRecordRequest()
.withStreamName(streamName)
.withData(ByteBuffer.wrap(bytes))
.withPartitionKey("test_partition_key")
);
private static byte[] toAvro(GenericContainer obj) throws IOException {
DatumWriter datumWriter = new SpecificDatumWriter();
byte[] bytes;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataFileWriter dataFileWriter = new DataFileWriter(datumWriter)) {
dataFileWriter.create(obj.getSchema(), baos);
dataFileWriter.append(obj);
dataFileWriter.flush();
baos.flush();
bytes = baos.toByteArray();
}
System.out.println(new String(bytes, StandardCharsets.UTF_8));
return bytes;
}
Потребитель:
public class DefaultRecordProcessor implements IRecordProcessor {
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
System.out.println("Processing " + records.size() + " records");
List<String> data = records.stream()
.map(Record::getData)
.map(d -> new String(d.array(), Charset.forName("UTF-8")))
.map(this::convert)
.collect(toList());
}
private GenericData convert(String data) {
SpecificData specificData = new SpecificData();
GenericData result = null;
DatumReader<GenericData> datumReader = new SpecificDatumReader<>(null, schema, specificData);
DataFileReader<GenericData> dataFileReader = new DataFileReader<>(new SeekableByteArrayInput(data.getBytes()), datumReader);
while (dataFileReader.hasNext()) {
result = dataFileReader.next(result);
}
return result;
}
}
Stacktrace:
java.lang.IllegalArgumentException: null
at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
at org.apache.avro.io.BinaryDecoder.readBytes(BinaryDecoder.java:288)
at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:112)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:97)
Я нашелочень похожая проблема, но она мне не помогла - Ошибка при запросе таблицы кустов с авро-поддержкой: java.lang.IllegalArgumentException
ОБНОВЛЕНИЕ: но если я неt передача данных через поток Kinesis работает правильно:
GenericContainer data = User.newBuilder().setName("Igor").build();
// write
DatumWriter datumWriter = new SpecificDatumWriter<>(data.getSchema(), new SpecificData());
byte[] bytes;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataFileWriter dataFileWriter = new DataFileWriter(datumWriter)) {
dataFileWriter.create(data.getSchema(), baos);
dataFileWriter.append(data);
dataFileWriter.flush();
baos.flush();
bytes = baos.toByteArray();
}
System.out.println(new String(bytes, StandardCharsets.UTF_8));
// read
SpecificData specificData = new SpecificData();
Object result = null;
DatumReader datumReader = new SpecificDatumReader<>(null, data.getSchema(), specificData);
DataFileReader dataFileReader = new DataFileReader<>(new SeekableByteArrayInput(bytes), datumReader);
while (dataFileReader.hasNext()) {
result = dataFileReader.next(result);
}
System.out.println(result);
ВЫХОД:
bjavro.schema�{"type":"record","name":"User","namespace":"avro","doc":"User schema","fields":[{"name":"name","type":{"type":"string","avro.java.string":"String"}}]}������ &�4^�[l
Igor������ &�4^�[l
{"name": "Igor"}
, но с другой стороны, если я установлю данные напрямую:
bytes = "Obj\u0001\u0002\u0016avro.schema�\u0002{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"avro\",\"doc\":\"User schema\",\"fields\":[{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}\u0000z4v܉I��PJ19���#\u0002\n".getBytes();
Я все еще получаю ошибку