Я реализовал приложение потоковой обработки, которое выполняет некоторые вычисления и преобразования и отправляет результат в вывод topi c. После этого я читаю из этой топи c и хочу подавить результаты для 35 ", как таймер, то есть все выходные записи из этого подавления будут отправлены в указанный c" тайм-аут "топи c.
Упрощенный код выглядит следующим образом:
inputStream
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(35), Suppressed.BufferConfig.unbounded()))
.toStream()
.peek((key, value) -> LOGGER.warn("incidence with Key: {} timeout -> Time = {}", key, 35))
.filterNot((key, value) -> value.isDisconnection())
Проблема, с которой я столкнулся, заключается в том, что подавление содержит записи в течение произвольного времени, а не заданных 35 секунд. Для получения дополнительной информации Я использую время события, извлеченное в предыдущем процессе, описанном в начале, и записи поступают каждую секунду;
Спасибо
Обновление
Это пример входной записи:
rowtime: 4/8/20 8:26:33 AM UTC, key: 34527882, value: {"incidenceId":"34527882","installationId":"18434","disconnection":false,"timeout":false, "creationDate":"1270801593"}