У меня проблема с агрегацией KStreams и окнами. Я хочу объединить запись в список записей, которые имеют одинаковый ключ, если он попадает во временное окно.
Я выбрал SessionWindows, потому что мне нужно работать с движущимся окном внутри сессии: скажем, запись A прибывает в 10:00:00; затем все остальные записи с тем же ключом, который прибывает
внутри 10-секундного окна время (до 10:00:10) попадет в тот же сеанс, имея в виду, что если оно прибудет в 10:00:03, окно будет перемещаться до 10:00:13 (+ 10 с).
Это приводит к тому, что у нас есть движущееся окно + 10 с от последней записи, полученной для данного ключа.
Теперь проблема: я хочу получить последний агрегированный результат. Я использовал .suppress (), чтобы указать, что я не хочу никаких промежуточных результатов, я просто хочу последний, когда окно закрывается. это
не работает нормально, потому что, хотя он не отправляет промежуточный агрегированный результат, когда заканчивается временное окно, я не получаю никакого результата. Я отметил, что для того, чтобы получить его, мне нужно опубликовать другой
сообщение в тему, то, что в моем случае невозможно.
Читая о .suppress (), я пришел к выводу, что это может быть не тот путь, которого я хочу достичь, поэтому я задаюсь вопросом: как заставить окно закрыться и отправить последний агрегированный вычисленный результат?
@StreamListener(ExtractContractBinding.RECEIVE_PAGE)
@SendTo(ExtractCommunicationBinding.AGGREGATED_PAGES)
public KStream<String, List<Records>> aggregatePages(KStream<?, Record> input) {
input.map(this::getRecord)
.groupBy(keyOfElement)
.windowedBy(SessionWindows.with(Duration.ofSeconds(10L)).grace(Duration.ofSeconds(10L)))
.aggregate(...do stuff...)
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map(this::createAggregatedResult);
}