ParameterTool parameterTool = ParameterTool.fromArgs(args);
Properties kafkaProperties = new Properties();
InputStream inputStream = EventStream.class.getClassLoader().getResourceAsStream(parameterTool.get("properties"));
kafkaProperties.load(inputStream);
String schemaRegistryUrl = parameterTool.getRequired("schema-registry-url");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<HotelsTxnSearchEvent> kafkaConsumer = new FlinkKafkaConsumer011<>(parameterTool.getRequired("topic"), ConfluentRegistryAvroDeserializationSchema.forSpecific(HotelsTxnSearchEvent.class, schemaRegistryUrl), kafkaProperties);
DataStream<HotelsTxnSearchEvent> events = env.addSource(kafkaConsumer);
events.print();
// run the application
env.execute("Sample Run");
Исключение:
Caused by: org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.NullPointerException
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227)
Мой HotelsTxnSearchEvent - это класс, созданный Avro с использованием схемы компиляции java -jar avro-tools-1.7.1.jar p327.avsc.
Получил немногоархив, где люди столкнулись с той же проблемой:http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3CCAGqa1oKgnzmu_xdvJFnr0wKJpKAXMvhGVgAdSZ7ctFWjJRi2Uw@mail.gmail.com%3E