в клиенте kafka или Spark они могут использовать ConsumerRecord для получения имени темы или смещения темы для каждой записи. Но как сделать то же самое в Flink?
мой псевдокод, как
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topicList, newSchemaUtils.MyDeserializeSchema(), properties);
DataStream<MyKafkaPkg> input = env.addSource(kafkaConsumer);
SingleOutputStreamOperator<MyKafkaPkg> mainDataStream = input
.process(...)
public static class MyDeserializeSchema implements DeserializationSchema<MyKafkaPkg> {
public MyKafkaPkgdeserialize(byte[] message) throws IOException{
MyKafkaPkg event = MyKafkaPkg.parseFrom(message);
return event;
}
public boolean isEndOfStream(MyKafkaPkg nextElement){
return false;
}
public TypeInformation<MyKafkaPkg> getProducedType(){
return TypeInformation.of(MyKafkaPkg.class);
}
}