apache моргание с Кафкой: InvalidTypesException - PullRequest
0 голосов
/ 04 мая 2020

У меня есть следующий код:

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());

FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
                    "test-kafka-topic",
                    new SimpleStringSchema(),
                    properties);

final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer);

DataStream<String> stringStream = kafkaInputStream
                    .map(new MapFunction<MyCustomClass,String>() {
                        @Override
                        public String map(MyCustomClass message) {
                            logger.info("--- Received message : " + message.toString());
                            return message.toString();
                        }
                    });

streamEnv.execute("Published messages");

MyCustomClassDeserializer реализован для преобразования байтового массива в java объект.

Когда я запускаю эту программу локально, я получаю ошибку:

Вызывается: org. apache .flink.api.common.functions.InvalidTypesException: Несоответствие ввода: ожидается тип * Basi c.

И я получаю это для строки кода:

.map(new MapFunction<MyCustomClass,String>() {

Не знаете, почему я это понимаю?

1 Ответ

1 голос
/ 04 мая 2020

Итак, у вас есть десериализатор, который возвращает POJO, но вы говорите Flink, что он должен десериализовать запись с byte[] до String с помощью SimpleStringSchema. Видишь проблему сейчас? :)

Не думаю, что вам следует использовать пользовательские десериализаторы Kafka в FlinkKafkaConsumer в целом. Вместо этого вы должны стремиться создать собственный класс, который расширяет DeserializationSchema от Flink. Это должно быть намного лучше с точки зрения безопасности типов и тестируемости.

...