Как десериализовать авро файлы - PullRequest
0 голосов
/ 24 января 2019

Я хотел бы прочитать папку hdfs, содержащую файлы avro с помощью spark.Затем я хотел бы десериализовать события avro, содержащиеся в этих файлах.Я хотел бы сделать это без библиотеки com.databrics (или любой другой, которая позволяет делать это легко).

Проблема в том, что у меня возникают трудности с десериализацией.

Я предполагаю, чтомой файл avro сжат с помощью snappy, потому что в начале файла (сразу после схемы) у меня записано

avro.codecsnappy

.Затем следуют читаемые или нечитаемые символы.

Моя первая попытка десериализации события avro выглядит следующим образом:

public static String deserialize(String message) throws IOException {
    Schema.Parser schemaParser = new Schema.Parser();
    Schema avroSchema = schemaParser.parse(defaultFlumeAvroSchema);

    DatumReader<GenericRecord> specificDatumReader = new SpecificDatumReader<GenericRecord>(avroSchema);

    byte[] messageBytes = message.getBytes();
    Decoder decoder = DecoderFactory.get().binaryDecoder(messageBytes, null);
    GenericRecord genericRecord = specificDatumReader.read(null, decoder);

    return genericRecord.toString();
}

Эта функция работает, когда я хочу десериализовать файл avro, который не 'там нет avro.codecsbappy.В этом случае у меня появляется ошибка:

Неверные данные: длина отрицательна: -50

Поэтому я попробовал другой способ сделать это:

    private static void deserialize2(String path) throws IOException {
    DatumReader<GenericRecord> reader = new GenericDatumReader<>();
    DataFileReader<GenericRecord> fileReader =
            new DataFileReader<>(new File(path), reader);
    System.out.println(fileReader.getSchema().toString());

    GenericRecord record = new GenericData.Record(fileReader.getSchema());

    int numEvents = 0;
    while (fileReader.hasNext()) {
        fileReader.next(record);
        ByteBuffer body = (ByteBuffer) record.get("body");
        CharsetDecoder decoder = Charsets.UTF_8.newDecoder();
        System.out.println("Positon of the index " + body.position());
        System.out.println("Size of the array : " + body.array().length);
        String bodyStr = decoder.decode(body).toString();
        System.out.println("THE BODY STRING  ---> " bodyStr);
        numEvents++;
    }
    fileReader.close();
}

и возвращает следующий вывод:

Положение индекса 0

Размер массива: 127482

STRING STRING -->

Я вижу, что массив не пустой, а просто возвращает пустую строку.

Как мне продолжить?

Ответы [ 2 ]

0 голосов
/ 24 января 2019

Ну, похоже, вы на правильном пути. Однако ваш ByteBuffer может не иметь правильного массива byte[] для декодирования, поэтому давайте попробуем следующее:

byte[] bytes = new byte[body.remaining()];
buffer.get(bytes);
String result = new String(bytes, "UTF-8"); // Maybe you need to change charset

Это должно работать, вы показали в своем вопросе, что ByteBuffer содержит фактические данные, как показано в примере кода, который вам, возможно, придется изменить кодировку.

Список кодировок: https://docs.oracle.com/javase/7/docs/api/java/nio/charset/Charset.html

Также полезно: https://docs.oracle.com/javase/7/docs/api/java/nio/ByteBuffer.html

0 голосов
/ 24 января 2019

Используйте это при преобразовании в строку:

String bodyStr = new String(body.array());
System.out.println("THE BODY STRING  ---> " + bodyStr);

Источник: https://www.mkyong.com/java/how-do-convert-byte-array-to-string-in-java/

...