Apache Beam - добавление задержки в конвейер - PullRequest
0 голосов
/ 27 марта 2020

У меня есть простой конвейер, который читает из Pub Sub topi c и пишет в BigQuery. Я хотел бы ввести 5-минутную задержку между чтением сообщения из topi c и записью его в BQ.

Я подумал, что мог бы сделать это с помощью триггера, аналогично приведенному ниже, однако сообщение по-прежнему проходит без задержки.

PCollection windowed_inputEvents = inputEvents.apply (Window.into (Fixed Windows .of (Duration.standardMinutes (1))). Duration.standardMinutes (5))). WithAllowedLateness (Duration.standardMinutes (1)). DiscardingFiredPanes ());

Возможно ли создать такую ​​задержку с помощью триггеров?

Спасибо

1 Ответ

0 голосов
/ 30 марта 2020

Похоже, вы перепутали несколько вещей. В вашем примере у вас фиксированное окно продолжительностью 1 минута, что означает, что в конце окна все элементы данных, которые являются частью окна, излучаются.

Триггеры - это, по сути, дополнительные рычаги, которые вы можете использовать для излучения данные до закрытия окна. Триггеры не могут содержать данные после закрытия периода. Например, если окно находится между 12:00 и 12:01, и если первый элемент появляется в 12:00, то в момент закрытия окна в 12:01, элемент испускается, он не задерживается до 12: 05.

Чтобы удовлетворить ваши требования, вы можете сделать несколько вещей: -

  1. Увеличьте размер периода окна так, чтобы он был длиннее периода хранения, и затем вы можете создавать элементы данных с задержкой.
  2. Если это невозможно в BigqueryIO, существует метод FILE_LOADS, который можно использовать для записи данных в Bigquery в пакетах, и этот API может также поддерживать длительность времени, используя withTriggeringFrequency. Более подробную информацию можно найти здесь - https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTriggeringFrequency -org.joda.time.Duration-
...