KTable suppress (Suppressed.untilTimeLimit ()) не содержит записей указанного времени - PullRequest
0 голосов
/ 13 апреля 2020

Я реализовал приложение потоковой обработки, которое выполняет некоторые вычисления и преобразования и отправляет результат в вывод topi c. После этого я читаю из этой топи c и хочу подавить результаты для 35 ", как таймер, то есть все выходные записи из этого подавления будут отправлены в указанный c" тайм-аут "топи c.

Упрощенный код выглядит следующим образом:

inputStream
     .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(35), Suppressed.BufferConfig.unbounded()))
     .toStream()
     .peek((key, value) -> LOGGER.warn("incidence with Key: {} timeout -> Time = {}", key, 35))
     .filterNot((key, value) -> value.isDisconnection())

Проблема, с которой я столкнулся, заключается в том, что подавление содержит записи в течение произвольного времени, а не заданных 35 секунд. Для получения дополнительной информации Я использую время события, извлеченное в предыдущем процессе, описанном в начале, и записи поступают каждую секунду;

Спасибо

Обновление

Это пример входной записи:

rowtime: 4/8/20 8:26:33 AM UTC, key: 34527882, value: {"incidenceId":"34527882","installationId":"18434","disconnection":false,"timeout":false, "creationDate":"1270801593"}
...