Apache Beam - как вызвать пустые окна? - PullRequest
0 голосов
/ 30 января 2019

Я пытаюсь выполнить простой подсчет количества прогонов в Apache Beam (Dataflowrunner), но мне трудно найти подходящее решение.

Цель конвейера:

  • Вход Pubsub: события
  • Выход Pubsub: каждые 30 секунд, число событий, замеченных за последние 5 минут

Проблема:

, когда естьпустое окно, я хочу видеть 0 в качестве счета.Этого не происходит.

Попытка 1: Подсчет глобально

Выдает ошибку, которую мне нужно использовать:

Combine.globally(Count.<T>combineFn()).withoutDefaults() 
PCollection slidingCount =
     input.apply("Create Windows",
         Window.into(
             SlidingWindows.of(Duration.standardSeconds(300))
             .every(Duration.standardSeconds(30)))
          )
          .apply("Count elements in window",
             Count.globally()
          );

Попытка 2: Combiner

Выдает ошибку, которую мне нужно указать, withoutdefaults().

PCollection slidingCount =
     input.apply("Create Windows",
         Window.into(
             SlidingWindows.of(Duration.standardSeconds(300))
             .every(Duration.standardSeconds(30)))
          )
          .apply("Count elements in window",
              Combine.globally(Count.<PubsubMessage>combineFn())
          );

Попытка 3: без ошибок

Это не генерирует пустые окна.По определению конечно.

PCollection slidingCount =
     input.apply("Create Windows",
         Window.into(
             SlidingWindows.of(Duration.standardSeconds(300))
             .every(Duration.standardSeconds(30)))
          )
          .apply("Count elements in window",
              Combine.globally(Count.<PubsubMessage>combineFn())
              .withoutDefaults()
          );

Попытка 4: asSingletonView

Добавление этой опции превращает мою коллекцию в представление, которое я не могу повторно использовать для продолжения моего конвейера.

PCollectionView slidingCount =
     input.apply("Create Windows",
         Window.into(
             SlidingWindows.of(Duration.standardSeconds(300))
             .every(Duration.standardSeconds(30)))
          )
          .apply("Count elements in window",
              Combine.globally(Count.<PubsubMessage>combineFn())
              .asSingletonView()
          );

Любые предложения о том, как я могу подойти к этому.

1 Ответ

0 голосов
/ 07 февраля 2019

Вы можете использовать таймеры и состояния с глобальным окном, если вам нужно генерировать результат каждые 30 секунд, даже если с момента последнего запуска не было никакого события.Я думаю, что это невозможно сделать с помощью встроенных триггеров и окон.

Вы можете сохранять счетчик каждые полминуты в состоянии и использовать таймер, чтобы периодически выводить результаты и отбрасывать ненужные значения.Эти два сообщения в блоге объясняют использование таймеров и состояния:

Обработка с учетом состояния с использованием Apache Beam

Своевременная (и с учетом состояния) обработка с использованием Apache Beam

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...