Используйте данные конвейера для запроса BigQuery apache_beam - PullRequest
0 голосов
/ 06 октября 2019

Я хочу использовать данные из данных, которые запускаются в моем конвейере, чтобы сгенерировать запрос и выполнить его в BigQuery.

Допустим, у меня есть что-то похожее на этот шаблон Python SQL:

template = '''
SELECT
  email
FROM
  `project_id.dataset_id.table_id`
WHERE
  email = {runtime_email}
'''

Я хочу отформатировать этот шаблон таким образом, чтобы runtime_email исходил из данных (элемента) конвейера.

EG Конвейер читает из PubSub переменную runtime_email с электронным письмом example@test.com

И я выполню что-то вроде:

with beam.Pipeline(options=options) as p:
    bq_results = (p
        | LoadDataFromPubSub()
        | beam.io.Read(
            beam.io.BigQuerySource(
                query=template.format(element['runtime_email']),
                use_standard_sql=True
            )
        )
    )

Есть идеи о том, как использовать данные конвейера для запуска следующего шага?

1 Ответ

1 голос
/ 06 октября 2019

Способ построения вашего конвейера неверен. Имейте в виду, что Beam строит график, а затем выполняет его.

Здесь вы определили 2 источника, 1 PubSub, 1 BigQuery. Источник BQ инициализируется до запуска вашего конвейера. Кстати, ваш runtime_email всегда будет None.

У вас есть 2 решения:

  • Прочитайте ваш PubSub до запуска вашего конвейера. Вы можете сделать это в своем коде Python или извне и предоставить данные в pipeline_options. Затем выполните итерацию по всем сообщениям Pubsub и соберите столько источников BQ, сколько у вас есть сообщений.
  • Сохраните ваш источник PubSub в конвейере и выполните стандартный вызов BQ с библиотекой python, а не с лучом, для чтения строк. Это рекомендуемый способ, если вы хотите передавать данные.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...