как получить информацию о теме в flinkKafkaConsumer? - PullRequest
0 голосов
/ 09 октября 2019

в клиенте kafka или Spark они могут использовать ConsumerRecord для получения имени темы или смещения темы для каждой записи. Но как сделать то же самое в Flink?

мой псевдокод, как

      FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topicList, newSchemaUtils.MyDeserializeSchema(), properties);

        DataStream<MyKafkaPkg> input = env.addSource(kafkaConsumer);

 SingleOutputStreamOperator<MyKafkaPkg> mainDataStream = input
                .process(...)


    public static class MyDeserializeSchema implements DeserializationSchema<MyKafkaPkg> {
        public MyKafkaPkgdeserialize(byte[] message) throws IOException{

            MyKafkaPkg event = MyKafkaPkg.parseFrom(message);
            return event;
        }

        public boolean isEndOfStream(MyKafkaPkg nextElement){
            return false;
        }

        public TypeInformation<MyKafkaPkg> getProducedType(){
            return TypeInformation.of(MyKafkaPkg.class);
        }
    }


...