По умолчанию .windowedBy(SessionWindows.with(...))
будет возвращать каждую новую входящую запись.Итак, как я могу ждать по крайней мере 1 секунду, прежде чем вернуть последний результат текущего окна сеанса?
Я пытаюсь с примером подсчета слов:
final KStream<String, String> source = builder.stream("streams-plaintext-input");
final KStream<String, Long> wordCounts = source
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
// Group the stream by word to ensure the key of the record is the word.
.groupBy((key, word) -> word)
.windowedBy(SessionWindows.with(Duration.ofSeconds(10)))
// Count the occurrences of each word (message key).
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(1), Suppressed.BufferConfig.unbounded()))
// Convert to KStream<String, Long>
.toStream((windowedId, count) -> windowedId.key());
wordCounts.foreach((word, count) -> {
System.out.println(word + " : " + count);
});
Это вход производителяи привести к клиенту, который на самом деле не так:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
>hello kafka stream
(nothing)
>hello kafka stream
hello : 1
kafka : 1
stream : 1
>hello kafka stream
hello : null
kafka : 1
stream : 1
Как я могу это исправить?Большое спасибо за чтение моего вопроса:)