Вызывается: org.apache.avro.AvroRuntimeException: искаженные данные. Длина отрицательна: -53 - PullRequest
0 голосов
/ 05 июня 2019

Попытка использовать Flink для чтения потока Kafka сериализованных данных "avro", например:

        tableEnv.connect(new Kafka()
                .version("0.11")
                .topic(source.getTopic())
                .properties(source.getProperties())
                .startFromLatest())       
        .withSchema(Schemafy.getSchemaFromJson(source.getAvroSchema()))
                .withFormat(new Avro()  
                    .avroSchema("{  \"namespace\": \"io.avrotweets\",  \"type\": \"record\",  \"name\": \"value\",  \"fields\": [    {      \"type\": \"string\",      \"name\": \"id\"    },    {      \"type\": \"string\",      \"name\": \"screen_name\"    },    {      \"type\": \"string\",      \"name\": \"text\"    }  ]}")
                )
                .inAppendMode()
                .registerTableSource(source.getName());

Я получаю следующее исключение:

java.io.IOException: Failed to deserialize Avro record.
    at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:170)
    at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:78)
    at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -53
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:122)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
    at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)

Я думаю, что проблемазаключается в том, что ключ сообщения также был сериализован, но с использованием собственной схемы:

{
  "namespace": "io.avrotweets",
  "type": "record",
  "name": "key",
  "fields": [
    {
      "type": "string",
      "name": "name"
    }
  ]
}

, но где я могу указать соединителю использовать эту схему для ключа.В любом случае, я не знаю, в этом проблема или нет, просто предположение.

1 Ответ

0 голосов
/ 06 июня 2019

Схема другая.Для сериализации вы используете разное количество полей, разные имена полей, другое имя записи.Afaik, вы должны иметь ту же схему avro для того же объекта.Если вы хотите десериализовать только некоторые объекты, подумайте, что вы можете использовать параметр «по умолчанию».

...