Газ в потоках Кафки - PullRequest
       15

Газ в потоках Кафки

2 голосов
/ 20 июня 2019

По умолчанию .windowedBy(SessionWindows.with(...)) будет возвращать каждую новую входящую запись.Итак, как я могу ждать по крайней мере 1 секунду, прежде чем вернуть последний результат текущего окна сеанса?

Я пытаюсь с примером подсчета слов:

        final KStream<String, String> source = builder.stream("streams-plaintext-input");

        final KStream<String, Long> wordCounts = source

                // Split each text line, by whitespace, into words.
                .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))

                // Group the stream by word to ensure the key of the record is the word.
                .groupBy((key, word) -> word)

                .windowedBy(SessionWindows.with(Duration.ofSeconds(10)))

                // Count the occurrences of each word (message key).
                .count(Materialized.with(Serdes.String(), Serdes.Long()))

                .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(1), Suppressed.BufferConfig.unbounded()))

                // Convert to KStream<String, Long>
                .toStream((windowedId, count) -> windowedId.key());

        wordCounts.foreach((word, count) -> {
            System.out.println(word + " : " + count);
        });

Это вход производителяи привести к клиенту, который на самом деле не так:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
>hello kafka stream

(nothing)

>hello kafka stream

hello : 1
kafka : 1
stream : 1

>hello kafka stream

hello : null
kafka : 1
stream : 1

Как я могу это исправить?Большое спасибо за чтение моего вопроса:)

1 Ответ

0 голосов
/ 25 июня 2019

KTable#suppress() оператор - ваш друг в этом случае.

Ознакомьтесь с документами и подробным сообщением в блоге об этом:

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...