Я хотел бы прочитать папку hdfs, содержащую файлы avro с помощью spark.Затем я хотел бы десериализовать события avro, содержащиеся в этих файлах.Я хотел бы сделать это без библиотеки com.databrics (или любой другой, которая позволяет делать это легко).
Проблема в том, что у меня возникают трудности с десериализацией.
Я предполагаю, чтомой файл avro сжат с помощью snappy, потому что в начале файла (сразу после схемы) у меня записано
avro.codecsnappy
.Затем следуют читаемые или нечитаемые символы.
Моя первая попытка десериализации события avro выглядит следующим образом:
public static String deserialize(String message) throws IOException {
Schema.Parser schemaParser = new Schema.Parser();
Schema avroSchema = schemaParser.parse(defaultFlumeAvroSchema);
DatumReader<GenericRecord> specificDatumReader = new SpecificDatumReader<GenericRecord>(avroSchema);
byte[] messageBytes = message.getBytes();
Decoder decoder = DecoderFactory.get().binaryDecoder(messageBytes, null);
GenericRecord genericRecord = specificDatumReader.read(null, decoder);
return genericRecord.toString();
}
Эта функция работает, когда я хочу десериализовать файл avro, который не 'там нет avro.codecsbappy.В этом случае у меня появляется ошибка:
Неверные данные: длина отрицательна: -50
Поэтому я попробовал другой способ сделать это:
private static void deserialize2(String path) throws IOException {
DatumReader<GenericRecord> reader = new GenericDatumReader<>();
DataFileReader<GenericRecord> fileReader =
new DataFileReader<>(new File(path), reader);
System.out.println(fileReader.getSchema().toString());
GenericRecord record = new GenericData.Record(fileReader.getSchema());
int numEvents = 0;
while (fileReader.hasNext()) {
fileReader.next(record);
ByteBuffer body = (ByteBuffer) record.get("body");
CharsetDecoder decoder = Charsets.UTF_8.newDecoder();
System.out.println("Positon of the index " + body.position());
System.out.println("Size of the array : " + body.array().length);
String bodyStr = decoder.decode(body).toString();
System.out.println("THE BODY STRING ---> " bodyStr);
numEvents++;
}
fileReader.close();
}
и возвращает следующий вывод:
Положение индекса 0
Размер массива: 127482
STRING STRING -->
Я вижу, что массив не пустой, а просто возвращает пустую строку.
Как мне продолжить?