Поток данных: обновление строк BigQuery с помощью конвейера Python - PullRequest
0 голосов
/ 05 декабря 2018

Представьте себе простой конвейер данных Google.В этом конвейере вы читаете из BQ, используя функцию луча apache, и в зависимости от возвращенного pcollection вы должны обновить эти строки

Journeys = (p
                    | 'Read from BQ' >> beam.io.Read(
                    beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True)))

Update = ( Journeys
                   | 'Updating Journey Table' >> beam.Map(UpdateBQ))

Write = (Journeys
                    | 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))

Проблема этого конвейера состоит в том, что UpdateBQ выполняется для каждого элемента в возвращенном pcollection, когдавы читаете таблицу (beam.Map)


Какой может быть лучший способ выполнить обновление в таблице BigQuery?

Я полагаю, это может бытьсделано без использования beam.Map и только для выполнения и обновления, которые обрабатывают всю входную коллекцию сразу.


Extra

def UpdateBQ(input):
    from google.cloud import bigquery
    import uuid
    import time
    client = bigquery.Client()
    STD = "#standardSQL"
    QUERY = STD + "\n" + """UPDATE table SET Field= 'YYY' WHERE Field2='XXX'"""
    client.use_legacy_sql = False
    query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4()))  # API request
    query_job.begin()
    <...>

Возможное решение

with beam.Pipeline(options=options) as p:
    Journeys = (p
                | 'Read from BQ' >> beam.io.Read(
                beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True))
                )

    Write = (Journeys
                | 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))


UpdateBQ();

1 Ответ

0 голосов
/ 05 декабря 2018

Проводите ли вы какие-либо дальнейшие преобразования с использованием конвейера лучей после чтения из BQ?Или это просто так, как вы показали в коде, то есть чтение из BQ, а затем запустить команду обновления в BQ?В этом случае вам не нужен луч вообще.Просто используйте запрос BQ, чтобы обновить данные в таблице, используя другую таблицу. Рекомендации BQ предлагают избегать вставки / обновления одной строки за раз.

...