Я новенький моргать и кафка. Я пытаюсь десериализовать данные avro с помощью реестра Confluent Schema. Я уже установил флинк и кафку на машину ec2. Кроме того, тема «test» была создана до запуска кода.
Путь к коду: https://gist.github.com/mandar2174/5dc13350b296abf127b92d0697c320f2
Код выполняет следующую операцию в рамках реализации:
1) Create a flink DataStream object using a list of user element. (User class is avro generated class)
2) Write the Datastream source to Kafka using AvroSerializationSchema.
3) Read the data from Kafka using ConfluentRegistryAvroDeserializationSchema by reading the schema from Confluent Schema registry.
Команда для запуска исполняемого файла flink:
./bin/flink run -c com.streaming.example.ConfluentSchemaRegistryExample /opt/flink-1.7.2/kafka-flink-stream-processing-assembly-0.1.jar
Исключительная ситуация при выполнении кода:
java.io.IOException: Unknown data format. Magic number does not match
at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:55)
at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:66)
at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Схема Avro, которую я использую для класса User, выглядит следующим образом:
{
"type": "record",
"name": "User",
"namespace": "com.streaming.example",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "favorite_number",
"type": [
"int",
"null"
]
},
{
"name": "favorite_color",
"type": [
"string",
"null"
]
}
]
}
Может ли кто-нибудь указать, какие шаги я пропускаю при десериализации avro-данных с использованием слитного реестра схем Кафки?