Я работал над обновлением процессора Flink (версия Flink 1.9), который читает из Kafka, а затем пишет в Kafka. Мы написали этот процессор для работы с кластером Kafka 0.10.2, и теперь мы развернули новый кластер Kafka с версией 2.2. Поэтому я решил обновить процессор, чтобы использовать последние версии FlinkKafkaConsumer и FlinkKafkaProducer (в соответствии с рекомендациями документации Flink). Однако я столкнулся с некоторыми проблемами с производителем Kafka. Я не могу заставить его сериализовать данные, используя устаревшие конструкторы (что неудивительно), и я не смог найти в Интернете никаких реализаций или примеров того, как реализовать Serializer (все примеры используют более старые соединители Kafka)
Текущая реализация (для Kafka 0.10.2) выглядит следующим образом
FlinkKafkaProducer010<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer010<String>(
"playerSessions",
new SimpleStringSchema(),
producerProps,
(FlinkKafkaPartitioner) null
);
При попытке реализовать следующее FlinkKafkaProducer
FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
"playerSessions",
new SimpleStringSchema(),
producerProps,
null
);
я получаю следующую ошибку:
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:525)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:483)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:357)
at com.ebs.flink.sessionprocessor.SessionProcessor.main(SessionProcessor.java:122)
и я так и не смог понять почему. Конструктор для FlinkKafkaProducer также устарел, и когда я пытаюсь реализовать неосуждаемый конструктор, я не могу понять, как сериализовать данные. Вот как это будет выглядеть:
FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
"playerSessions",
new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
return null;
}
},
producerProps,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
Но я не понимаю, как реализовать KafkaSerializationSchema, и я не нахожу примеров этого в Интернете или в документах Flink.
У кого-нибудь есть опыт реализации этого или какие-либо советы о том, почему FlinkProducer получает исключение NullPointerException на этом шаге?