Я выполняю агрегацию в оконном потоке и хочу подавить ранние результаты агрегации. Под ранними результатами я подразумеваю результаты, вычисленные до окончания окна, но не те результаты, которые были получены в течение льготного периода. Таким образом, я хотел бы подавить все результаты агрегирования с помощью отметки времени <конец окна, но переслать все записи с отметкой времени> = конец окна и отметку времени <закрыть окно. </p>
Пример топологии минимальных потоков Кафки:
new StreamsBuilder()
.stream("my-topic")
.windowedBy(TimeWindows.of(myWindowSize).grace(myGracePeriod))
.reduce(myReducer)
.suppress( /* searched for*/ )
.toStream();
Таким образом, Suppressed.untilWindowCloses( .. )
для меня не вариант, так как мне пришлось бы ждать, пока не истечет льготный период, который может быть долгим.
Согласно KIP-328 точно требуемое поведение может быть получено с использованием Suppressed.untilTimeLimit(Duration.ZERO, .. )
as (цитируется в описании KIP):
a. Как долго ждать новых обновлений перед отправкой. Это количество времени, измеряемое либо от времени события (для обычных KTables), либо от конца окна (для оконных KTables), чтобы буферизовать каждый ключ до их передачи в нисходящем направлении.
Однако Kafka Streams JavaDo c, а также соответствующая реализация подразумевают, что это не так, и ограничение по времени начинает обратный отсчет при получении первой записи для каждого (оконного) ключа, а не когда окно заканчивается.
Я был бы рад получить разъяснения по этому поводу и поддержать, как добиться желаемого поведения.