Совершенно новый для Flink, как в начале POC, чтобы мигрировать из Spark, и, конечно, зациклился на чем-то, что я ожидал, чтобы это был глухой данк ...;)
Я создалAvroDeserializationSchema
из Avro Schema
выглядит следующим образом:
AvroDeserializationSchema avroConverter = AvroDeserializationSchema.forGeneric( avroSchema )
Я вижу много примеров, когда AvroDeserializationSchema
подается на FlinkKafkaConsumer011
, но ни один не использует его с источником файла.
Если бы кто-то был достаточно любезен, чтобы указать мне направление, я был бы признателен.
Я видел streamingFlinkEnv.addSource( StreamingDataSource )
и streamingFlinkEnv.readFile()
, но не испытывал радости, собирая эти кусочки головоломки вместес любым другим клеем, который я могу найти.
FWIW: Я действительно предпочитаю избегать создания класса, используя avro-tools, и решил, что метод AvroDeserializationSchema.forGeneric()
был моей спасительной милостью.