Не совсем ясно, какое поведение вы ожидаете.Один вопрос: что вы ожидаете, если данные поступят в течение двух минут?Хотите перезапустить двухминутный интервал, не перезапускать его, переиздавать данные или нет?
Похоже, что триггер, который вы пытаетесь описать, выглядит примерно так:
- ожидание, пока водяной знак не достигнет конца окна, во время события;
- ожидание дополнительных 2 минут во время обработки;
- отправка данных;
Если на шаге 2 это было время события, т.е. вы хотели переиздать окно, если прибывает поздний элемент, который вписывается в window + 2min
, то вы можете использовать withAllowedLateness()
.Хотя он звучит не так, как вы хотите, потому что он может продолжать переиздавать содержимое окна каждый раз, когда приходит соответствующий поздний элемент.
Со временем обработки на шаге 2 это вообще невозможно в случае базовых триггеров, которыедоступны в Beam.Вы, вероятно, можете добиться желаемого поведения, если вы вручную управляете состоянием и таймерами в своем собственном ParDo
, например, вы можете наблюдать за входящими элементами, отслеживать их в состоянии, а затем по таймеруто, что ты хочешь.Это может стать очень сложным и, возможно, все еще не достаточно гибким для вашего конкретного случая использования.
Одна из основных проблем заключается в том, что в Beam вообще нет хорошего способа определения триггеров времени обработки.Было бы сложно определить общий механизм работы с таймерами таким образом.Например, когда вы хотите выразить "wait for 2 minutes"
, фреймворк должен понимать, что означают эти две минуты, когда запускать таймер, поэтому вам также нужен механизм для выражения этого.А с композицией, продолжением и другими сложностями это не так просто рассуждать.Так что это не в рамках в этом общем виде.
Чтобы реализовать только "wait for 2 minutes after the last element was seen in the window"
, структура должна следить за ним и устанавливать таймер.Технически возможно сделать что-то подобное, но, похоже, никто этого еще не сделал.
Кажется, в Beam имеется только один значимый триггер времени обработки, но он не является общимдостаточно и не делает то, что вы хотите.Вы можете посмотреть составные триггеры, такие как AfterFirst
или AfterAll
, но они, вероятно, не помогут вам без лучшего общего триггера времени обработки.