Я закончил тем, что создал логику агрегации, которая использует результат агрегации, который был сохранен в теме кафки.Вот логика:
private KStream<String, StringAggregator> getAggregator(String topicName,
KStream<String, String> input,
KTable<String, StringAggregator> aggregator) {
return input
.leftJoin(aggregator, (inputMessage, aggregatorMessage) -> {
if (aggregatorMessage == null) {
aggregatorMessage = new StringAggregator();
}
aggregatorMessage.add(inputMessage);
return aggregatorMessage;
}).peek((k, v) -> logger.info("Aggregated a join input for {}: {}, {} aggregated.", topicName, k, v.size()));
}
Вот логика для фактического построения потока.
String topicName = "input";
KStream<String, String> input = streamsBuilder.stream(topicName);
KTable<String, StringAggregator> aggregator = streamsBuilder.table("aggregate");
getAggregator(topicName, input, aggregator).to("aggregate");