У меня есть следующий код:
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());
FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
"test-kafka-topic",
new SimpleStringSchema(),
properties);
final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer);
DataStream<String> stringStream = kafkaInputStream
.map(new MapFunction<MyCustomClass,String>() {
@Override
public String map(MyCustomClass message) {
logger.info("--- Received message : " + message.toString());
return message.toString();
}
});
streamEnv.execute("Published messages");
MyCustomClassDeserializer реализован для преобразования байтового массива в java объект.
Когда я запускаю эту программу локально, я получаю ошибку:
Вызывается: org. apache .flink.api.common.functions.InvalidTypesException: Несоответствие ввода: ожидается тип * Basi c.
И я получаю это для строки кода:
.map(new MapFunction<MyCustomClass,String>() {
Не знаете, почему я это понимаю?