Проблема Kafka KStreams в агрегации с временным окном - PullRequest
2 голосов
/ 29 мая 2019

У меня проблема с агрегацией KStreams и окнами. Я хочу объединить запись в список записей, которые имеют одинаковый ключ, если он попадает во временное окно. Я выбрал SessionWindows, потому что мне нужно работать с движущимся окном внутри сессии: скажем, запись A прибывает в 10:00:00; затем все остальные записи с тем же ключом, который прибывает внутри 10-секундного окна время (до 10:00:10) попадет в тот же сеанс, имея в виду, что если оно прибудет в 10:00:03, окно будет перемещаться до 10:00:13 (+ 10 с).

Это приводит к тому, что у нас есть движущееся окно + 10 с от последней записи, полученной для данного ключа.

Теперь проблема: я хочу получить последний агрегированный результат. Я использовал .suppress (), чтобы указать, что я не хочу никаких промежуточных результатов, я просто хочу последний, когда окно закрывается. это не работает нормально, потому что, хотя он не отправляет промежуточный агрегированный результат, когда заканчивается временное окно, я не получаю никакого результата. Я отметил, что для того, чтобы получить его, мне нужно опубликовать другой сообщение в тему, то, что в моем случае невозможно.

Читая о .suppress (), я пришел к выводу, что это может быть не тот путь, которого я хочу достичь, поэтому я задаюсь вопросом: как заставить окно закрыться и отправить последний агрегированный вычисленный результат?

@StreamListener(ExtractContractBinding.RECEIVE_PAGE)
@SendTo(ExtractCommunicationBinding.AGGREGATED_PAGES) 
public KStream<String, List<Records>> aggregatePages(KStream<?, Record> input) { 
    input.map(this::getRecord)
            .groupBy(keyOfElement)
            .windowedBy(SessionWindows.with(Duration.ofSeconds(10L)).grace(Duration.ofSeconds(10L)))
            .aggregate(...do stuff...)
            .suppress(Suppressed.untilWindowCloses(unbounded()))
            .toStream()
            .map(this::createAggregatedResult);
}

1 Ответ

0 голосов
/ 15 июня 2019

Я буду кратким здесь, потому что я отвечаю со своего телефона.

Короче говоря, причина, по которой это происходит, заключается в том, что в KStreams и большинстве других механизмов обработки потоков, которые вычисляют агрегации, время работает на основевремя события.

https://kafka.apache.org/0101/documentation/streams#streams_time

Другими словами, окно не может закрыться, пока новое сообщение не выйдет за пределы вашего временного окна + время отсрочки, которое учитывает опоздавшие сообщения.

Более того, основываясь на некоторых модульных тестах, которые я писал недавно, я склонен полагать, что второе сообщение должно находиться в том же разделе, что и предыдущее сообщение, для того, чтобы время события двигалось вперед.На практике, когда вы работаете в рабочей среде и, по-видимому, обрабатываете сотни сообщений в секунду, это становится незаметным.

Позвольте мне также добавить, что вы можете реализовать пользовательский экстрактор меток времени, который позволяет вам детально контролировать время в зависимости от того, какое временное окноконкретное сообщение.

как заставить окно закрыться и отправить последний агрегированный расчетный результат?

Чтобы окончательно ответить на ваш вопрос, невозможнопринудительно закрыть временное окно, не отправляя дополнительного сообщения в исходную тему.

...