Flink Kafka Схема AvroDeserialization Исключение нулевого указателя - PullRequest
0 голосов
/ 27 февраля 2019
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    Properties kafkaProperties = new Properties();
    InputStream inputStream = EventStream.class.getClassLoader().getResourceAsStream(parameterTool.get("properties"));
    kafkaProperties.load(inputStream);
    String schemaRegistryUrl = parameterTool.getRequired("schema-registry-url");

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    FlinkKafkaConsumer011<HotelsTxnSearchEvent> kafkaConsumer = new FlinkKafkaConsumer011<>(parameterTool.getRequired("topic"), ConfluentRegistryAvroDeserializationSchema.forSpecific(HotelsTxnSearchEvent.class, schemaRegistryUrl), kafkaProperties);

    DataStream<HotelsTxnSearchEvent> events = env.addSource(kafkaConsumer);
    events.print();

    // run the application
    env.execute("Sample Run");

Исключение:

Caused by: org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.NullPointerException
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227)

Мой HotelsTxnSearchEvent - это класс, созданный Avro с использованием схемы компиляции java -jar avro-tools-1.7.1.jar p327.avsc.

Получил немногоархив, где люди столкнулись с той же проблемой:http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3CCAGqa1oKgnzmu_xdvJFnr0wKJpKAXMvhGVgAdSZ7ctFWjJRi2Uw@mail.gmail.com%3E

...