запуск с фиксированными интервалами при потоковой передаче Apache - PullRequest
0 голосов
/ 11 мая 2018

Я использую Apache Beam для написания некоторых потоковых конвейеров.Одним из требований для моего варианта использования является то, что я хочу запускать каждые X минут относительно времени начала или окончания окна.Как я могу достичь этого.Текущий триггер AfterProcessingTime.pastFirstElementInPane () относится к времени обработки первого элемента в этом окне.

Например, я создал фиксированные 1-минутные окна, поэтому у меня есть window_1 (интервал 0-1 мин), window_2(Интервал 1 - 2 мин) и так далее.Теперь я хочу, чтобы результаты для каждого окна запускались ровно один раз через 10 минут с начала окна, то есть window_1 в 0 + 10 -> 10-й минуте, window_2 в 11-й минуте (1 + 10).[Примечание: я настраиваю фиксированные окна так, чтобы задержка составляла> 10 минут, чтобы элементы не сбрасывались при задержке]

Есть ли способ добиться такого рода запуска для фиксированного окна.

Я не могу просто назначить все элементы глобальному окну, а затем повторять триггер каждую минуту, потому что тогда он теряет информацию о времени окна всех элементов. Например, если в моем pcollection есть 2 элемента, которые принадлежат window_1 и window_2 на основе временной метки события,но были отложены на 3 и 3,2 минуты.Присвоение их глобальному окну приведет к некоторому выводу в конце 4-й минуты, принимая во внимание оба элемента, в то время как в действительности я хочу, чтобы они были назначены этому фактическому фиксированному окну (как поздние данные).

Я хочу, чтобы элементы были назначены для window_1 и window_2 на основе временной метки события, а затем для запуска window_1 с результатом вывода на 10-й минуте путем обработки только 1 поздних данных для этого окна и затем для запуска window_2 на 11-й минуте с выводомпосле обработки единственный пришедший элемент задержался на 3,2 минуты.Какими должны быть настройки триггера для достижения такого поведения в моем потоковом конвейере.

1 Ответ

0 голосов
/ 04 июля 2018

Я считаю, что следующий код работает для вас:

pcollection | WindowInto(
    FixedWindows(1 * 60).configure().withAllowedLateness(),
    trigger=AfterProcessingTime(9 * 60),

Размер окна составляет 1 минуту, и через 9 минут он запускает данные.Однако во многих случаях намного быстрее использовать скользящее окно, а затем позаботиться о дублированных обработанных элементах.Как упомянул AlexAmato, триггеры времени Watermark и AfterWatermark Event также должны работать здесь.

...