KafkStreams: отменить сообщение во время окна - PullRequest
1 голос
/ 18 июня 2019

Необходимо удалить дубликаты сообщения в пределах временного окна. Сообщение поступает постоянно. Сильфон является частью кода.

 kStream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
             .reduce((k,m) -> m)
             .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
             .toStream()
             .foreach((k, v) -> doSomeProcess(k,v));

Что я здесь не так делаю. Я не вижу никакого вызова метода doSomeProcess. Сообщения приходят.

1 Ответ

1 голос
/ 20 июня 2019

Оказалось, что «Эта функция требует добавления параметра« льготный период »для окон» От https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables .... .windowedBy (TimeWindows.of (Duration.ofSeconds (15)). grace (Duration.ofSeconds (5))) ....

Это исправило проблему.

...