Как мы можем реализовать триггер, который срабатывает после определенного количества в PCollection - PullRequest
0 голосов
/ 12 февраля 2019

Я пишу конвейер данных Beam для чтения из неограниченного источника, такого как Кафка.Я не выполняю никаких аналитических функций.Я хотел бы преобразовать элементы и записать в приемник, скажем, после того, как количество записей PCollection достигнет определенного порога.Это должно регулировать данные, отправляемые в приемник

Посмотрел существующие триггеры, но не смог понять, подходят ли они

1 Ответ

0 голосов
/ 21 февраля 2019

Я проверил триггеры, и они работают, как и ожидалось, вот пример кода scala

val data: PCollection[Type] = results
  .apply(
  Window
    .into[Type](FixedWindows.of(Duration.millis(2000)))
    .withAllowedLateness(Duration.millis(1000))
    .triggering(AfterPane.elementCountAtLeast(4)
    .accumulatingFiredPanes()
)

Он ждет 4 элемента и затем вызывает окно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...