Flink ParquetAvroWriters дает литье ОШИБКА - PullRequest
0 голосов
/ 19 января 2020

Я написал пример кода для записи потока в формат паркета после чтения GenericRecord из Kafka

 Properties config = new Properties();
        config.setProperty("bootstrap.servers", "localhost:9092");
        config.setProperty("group.id", "1");
        config.setProperty("zookeeper.connect", "localhost:2181");
        String schemaRegistryUrl = "http://127.0.0.1:8081";

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        File file = new File(EventProcessor.class.getClassLoader().getResource("event.avsc").getFile());
        Schema schema = new Schema.Parser().parse(file);

        DataStreamSource<GenericRecord> input = env
                .addSource(
                        new FlinkKafkaConsumer010<GenericRecord>("event_new",
                                new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
                                config).setStartFromEarliest());


        Path path = new Path("/tmp");

        final StreamingFileSink sink = StreamingFileSink.forBulkFormat
                (path, ParquetAvroWriters.forGenericRecord(schema)).build();

        input.addSink(sink);

Когда я запускаю этот код, я получаю сообщение об ошибке:

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

Caused by: java.lang.ClassCastException: org.apache.avro.util.Utf8 cannot be cast to org.apache.avro.generic.IndexedRecord
    at org.apache.avro.generic.GenericData.getField(GenericData.java:697)
    at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:188)

Я не могу понять, что не так. Пожалуйста, помогите мне понять и решить эту проблему.

1 Ответ

0 голосов
/ 20 января 2020

Наиболее вероятная причина в том, что ваш файл event.avs c не соответствует записям, хранящимся в Kafka. Он находит строку, где он ожидает запись.

Если вы добавите схему и пример записи из Kafka (например, напечатанный с консолью-потребителем), то я мог бы помочь больше.

...