Вот простое определение окна с оператором подавления:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
.suppress(untilWindowCloses(unbounded())
.toStream()
// process last event here
...
Итак, мой вопрос: как оператор подавления определяет, является ли событие последним событием окна? давайте представим, я удаляю оператор подавления:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
.toStream()
...
Я понимаю, что для каждого изменения в KTable
будут сгенерированы два события:
- Запись с
null
значение для удаления предыдущей записи - Новая запись с новым значением
Я хочу удалить оператор suppress
и самостоятельно обнаружить последнюю запись:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
.toStream()
.filter( /* detect the last record here */ )
Предоставляется ли эта информация в DSL или API процессора?