Давайте просто предположим, что функция groupby не была доступна в потоках kafka. Могу ли я сделать ниже, чтобы получить количество слов и построить KTable поверх него? Обратите внимание, что в топологии я дважды использую слово-count-topi c. У меня есть сценарий использования, в котором я хочу построить что-то итеративно, и для следующего события потока я хочу просмотреть предыдущее значение и обновить его на основе события. Я хочу сохранить последнее значение в том же топи c, на котором я собираю Ktable.
KTable<String,Long> wordCountTable = builder.table("word-count-topic",Consumed.with(Serdes.String(), Serdes.Long()));
KStream<String,String> wordsStream = builder.stream("words-topic",Consumed.with(Serdes.String(), Serdes.String()));
KStream<String,String> msgStream = wordsStream
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.selectKey((k,v) -> v);
msgStream.leftJoin(kTable, (word,count) -> {
if( count == null) return new WordCount(word, Long.valueOf(1));
else return new WordCount(word, count + 1);
})
.mapValues((k,v)-> v.getCount())
.to("word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));
streams = new KafkaStreams(builder.build(), props);
streams.start();