Я хочу использовать данные из данных, которые запускаются в моем конвейере, чтобы сгенерировать запрос и выполнить его в BigQuery.
Допустим, у меня есть что-то похожее на этот шаблон Python SQL:
template = '''
SELECT
email
FROM
`project_id.dataset_id.table_id`
WHERE
email = {runtime_email}
'''
Я хочу отформатировать этот шаблон таким образом, чтобы runtime_email
исходил из данных (элемента) конвейера.
EG Конвейер читает из PubSub переменную runtime_email
с электронным письмом example@test.com
И я выполню что-то вроде:
with beam.Pipeline(options=options) as p:
bq_results = (p
| LoadDataFromPubSub()
| beam.io.Read(
beam.io.BigQuerySource(
query=template.format(element['runtime_email']),
use_standard_sql=True
)
)
)
Есть идеи о том, как использовать данные конвейера для запуска следующего шага?