Flink окно сеанса с триггером onEventTime? - PullRequest
0 голосов
/ 24 марта 2019

Я хочу создать сеансовое окно на основе EventTime во Flink, чтобы оно срабатывало, когда время события нового сообщения более чем на 180 секунд больше времени события сообщения, создавшего окно.

Например:

t1(0 seconds)  : msg1  <-- This is the first message which causes the session-windows to be created
t2(13 seconds) : msg2
t3(39 seconds) : msg3
.
.
.
.
t7(190 seconds) : msg7 <-- The event time (t7) is more than 180 seconds than t1 (t7 - t1 = 190), so the window should be triggered and processed now.
t8(193 seconds) : msg8 <-- This message, and all subsequent messages have to be ignored as this window was processed at t7

Я хочу создать триггер так, чтобы вышеуказанное поведение достигалось с помощью соответствующего водяного знака или триггера onEventTime.Кто-нибудь может привести несколько примеров для достижения этой цели?

1 Ответ

1 голос
/ 27 марта 2019

Лучшим способом решения этой проблемы может быть использование ProcessFunction, а не настраиваемое управление окнами. Если, как показано в вашем примере, события будут обрабатываться в порядке отметок времени, то это будет довольно просто. Если, с другой стороны, вам нужно обрабатывать неупорядоченные события (что часто случается при работе с данными времени события), это будет несколько сложнее. (Представьте, что сообщение msg6 с временем 187 приходит после t8. Если это возможно и если это повлияет на результаты, которые вы хотите получить, то это нужно обработать.)

Если события в порядке, тогда логика будет выглядеть примерно так:

Используйте AscendingTimestampExtractor в качестве основы для водяных знаков.

Используйте состояние Flink (возможно, ListState) для хранения содержимого окна. Когда событие приходит, добавьте его в окно и проверьте, прошло ли оно более 180 секунд с момента первого события. Если это так, обработайте содержимое окна и очистите список.

Если ваши события могут быть не в порядке, используйте BoundedOutOfOrdernessTimestampExtractor и не обрабатывайте содержимое окна до тех пор, пока currentWatermark не покажет, что время события прошло через 180 секунд после времени начала окна (вы можете использовать таймер времени события за это). Не полностью очищайте список при запуске окна, а просто удалите элементы, принадлежащие закрывающемуся окну.

...