Представьте себе простой конвейер данных Google.В этом конвейере вы читаете из BQ, используя функцию луча apache, и в зависимости от возвращенного pcollection вы должны обновить эти строки
Journeys = (p
| 'Read from BQ' >> beam.io.Read(
beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True)))
Update = ( Journeys
| 'Updating Journey Table' >> beam.Map(UpdateBQ))
Write = (Journeys
| 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))
Проблема этого конвейера состоит в том, что UpdateBQ выполняется для каждого элемента в возвращенном pcollection, когдавы читаете таблицу (beam.Map)
Какой может быть лучший способ выполнить обновление в таблице BigQuery?
Я полагаю, это может бытьсделано без использования beam.Map и только для выполнения и обновления, которые обрабатывают всю входную коллекцию сразу.
Extra
def UpdateBQ(input):
from google.cloud import bigquery
import uuid
import time
client = bigquery.Client()
STD = "#standardSQL"
QUERY = STD + "\n" + """UPDATE table SET Field= 'YYY' WHERE Field2='XXX'"""
client.use_legacy_sql = False
query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4())) # API request
query_job.begin()
<...>
Возможное решение
with beam.Pipeline(options=options) as p:
Journeys = (p
| 'Read from BQ' >> beam.io.Read(
beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True))
)
Write = (Journeys
| 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))
UpdateBQ();