Исключение при десериализации авро-данных с использованием ConfluentSchemaRegistry? - PullRequest
1 голос
/ 22 апреля 2019

Я новенький моргать и кафка. Я пытаюсь десериализовать данные avro с помощью реестра Confluent Schema. Я уже установил флинк и кафку на машину ec2. Кроме того, тема «test» была создана до запуска кода.

Путь к коду: https://gist.github.com/mandar2174/5dc13350b296abf127b92d0697c320f2

Код выполняет следующую операцию в рамках реализации:

1) Create a flink DataStream object using a list of user element. (User class is avro generated class)
2) Write the Datastream source to Kafka using AvroSerializationSchema.
3) Read the data from Kafka using ConfluentRegistryAvroDeserializationSchema by reading the schema from Confluent Schema registry.

Команда для запуска исполняемого файла flink:

./bin/flink run -c com.streaming.example.ConfluentSchemaRegistryExample /opt/flink-1.7.2/kafka-flink-stream-processing-assembly-0.1.jar

Исключительная ситуация при выполнении кода:

java.io.IOException: Unknown data format. Magic number does not match
    at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:55)
    at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:66)
    at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)

Схема Avro, которую я использую для класса User, выглядит следующим образом:

{
  "type": "record",
  "name": "User",
  "namespace": "com.streaming.example",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ]
    },
    {
      "name": "favorite_color",
      "type": [
        "string",
        "null"
      ]
    }
  ]
}

Может ли кто-нибудь указать, какие шаги я пропускаю при десериализации avro-данных с использованием слитного реестра схем Кафки?

1 Ответ

1 голос
/ 22 апреля 2019

Для написания данных Avro необходимо также использовать реестр, чтобы десериализатор, который от него зависит, работал.

Но это открытый пиар в Flink, все же для добавления ConfluentRegistryAvroSerializationSchema класса

Обходной путь, я полагаю, будет использовать AvroDeserializationSchema, который не зависит от реестра.

Если вы хотите использовать Реестр в коде производителя, вам придется делать это за пределами Flink, пока этот PR не будет объединен.

...