KStreams: Как вы получаете (исходную) тему записи? - PullRequest
1 голос
/ 31 мая 2019

У меня есть следующее

//Config setup
Properties props = ...; //setup

List<String> topicList = Arrays.asList({"A", "B", "C"});

StreamBuilder builder = new StreamBuilder();
KStream<String, String> source = builder.stream(topicList);

source
  .map((k,v) -> {

    //How can i get the topic of the record here

  })
  .to((k,v,r) -> {//busy code for topic routing});

new KafkaStream(builder.build(), properties).start();

1 Ответ

3 голосов
/ 01 июня 2019

Вы можете получить нужное название темы, используя ProcessorContext.topic () . Чтобы получить доступ к ProcessorContext, используйте KStream.process (), предоставив ему соответствующую реализацию Processor .

Также вы можете использовать KStream.transform ():

KStream<InputKeyType, InputValueType> stream2 = stream.transform(new TransformerSupplier<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>>() {
            @Override
            public Transformer<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>> get() {
                return new Transformer<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>>() {
                    private ProcessorContext context;

                    @Override
                    public void init(ProcessorContext context) {
                        this.context = context;
                    }

                    @Override
                    public KeyValue<OutputKeyType, OutputValueType> transform(InputKeyType key, InputValueType value) {

                        this.context.topic() // topic name you need
                        // logic here
                        return new KeyValue<>(OutputKeyType key, OutputValueType value);

                    }

                    @Override
                    public void close() {

                    }
                };
            }
        });
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...