Активировать элементы ровно один раз, используя фиксированное окно с Apache Beam - PullRequest
0 голосов
/ 02 августа 2020

Я читаю данные из Google pub-sub и помещаю их в фиксированное окно продолжительностью 5 минут. Но - данные не срабатывают правильно. Я пробовал несколько комбинаций, похоже, ничего не работает. Это выглядит довольно просто, но я не могу понять это правильно.

Пример использования -

  1. Чтение данных из pub-sub
  2. Окно их в 5 минут
  3. Выполнять агрегирование после окончания 5-минутного окна.
  4. AllowedLateness период 1 день.

Попытки:

1. С помощью AfterWatermark.pastEndOfWindow для запуска. Это вообще не производит никакого вывода. Было прочитано около 1000 сообщений из подписки, но никаких сообщений в окне не выводилось.

Window.<EventModel>into(
                FixedWindows.of(Duration.standardMinutes(5)))
                .triggering(AfterWatermark.pastEndOfWindow())
                .withAllowedLateness(Duration.standardDays(1), Window.ClosingBehavior.FIRE_ALWAYS)
                .discardingFiredPanes();

2. Использование глобального окна: это работает правильно. Но здесь используется Global Windows, но мне нужно реализовать фиксированное оконное управление.

Window<EventModel> window = Window.<OrderEvent>
                into(new GlobalWindows())
                .triggering(
                        Repeatedly.forever( 
              AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5))))
                .discardingFiredPanes()
                .withAllowedLateness(Duration.standardDays(1));

Я пробовал другие комбинации, которые используют - Early или Late Firings - которые запускают некоторые элементы, но не подходят для моего варианта использования - Мне не нужны ранние или поздние срабатывания - просто результаты нужны каждые 5 минут.

Любой ввод был бы действительно полезен, я потратил на это слишком много времени, но безуспешно.

1 Ответ

0 голосов
/ 02 августа 2020

Обнаружена проблема:

Это была ошибка DirectRunner. По какой-то причине - прямой бегун не продвигал водяной знак и, следовательно, ничего не запускалось.

Приведенный ниже код работал правильно - с Dataflow Runner - элементы запускались после конца окна.

Window<MyModel> window = Window.<MyModel>into(FixedWindows.of(Duration.standardMinutes(10)))
                    .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
                    .withAllowedLateness(Duration.standardDays(1))
                    .discardingFiredPanes();
...