Использование topi c более одного раза в топологии Kafka Stream - PullRequest
0 голосов
/ 23 апреля 2020

Давайте просто предположим, что функция groupby не была доступна в потоках kafka. Могу ли я сделать ниже, чтобы получить количество слов и построить KTable поверх него? Обратите внимание, что в топологии я дважды использую слово-count-topi c. У меня есть сценарий использования, в котором я хочу построить что-то итеративно, и для следующего события потока я хочу просмотреть предыдущее значение и обновить его на основе события. Я хочу сохранить последнее значение в том же топи c, на котором я собираю Ktable.

KTable<String,Long> wordCountTable = builder.table("word-count-topic",Consumed.with(Serdes.String(), Serdes.Long()));

KStream<String,String> wordsStream = builder.stream("words-topic",Consumed.with(Serdes.String(), Serdes.String()));

KStream<String,String> msgStream = wordsStream
                                   .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
                                   .selectKey((k,v) -> v);

msgStream.leftJoin(kTable, (word,count) -> {
                                             if( count == null) return new WordCount(word, Long.valueOf(1));
                                             else return new WordCount(word, count + 1);
                                           })
            .mapValues((k,v)-> v.getCount())
            .to("word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));

streams = new KafkaStreams(builder.build(), props);
streams.start();

1 Ответ

0 голосов
/ 26 апреля 2020

Это должно работать. Почему бы просто не запустить код?

...