Пакетная вставка в Bigquery с потоком данных - PullRequest
0 голосов
/ 04 января 2019

Я использую конвейер луча apache и хочу выполнить пакетную вставку в bigquery с помощью python. Мои данные поступают из Pub / Sub, который не ограничен. В результате моего исследования, GlobalWindows с триггерами должна решить мою проблему. Я попробовал свой конвейер с оконным управлением, но он все еще выполняет потоковую вставку. Мой конвейерный код выглядит следующим образом:

p2 = (p | 'Read ' >> beam.io.ReadFromPubSub(subscription=subscription_path,
    with_attributes=True,
    timestamp_attribute=None,id_label=None)
       | 'Windowing' >>  beam.WindowInto(window.GlobalWindows(),
           trigger=Repeatedly(
                   AfterAny(
                AfterCount(100),
           AfterProcessingTime(1 * 60))), 
        accumulation_mode=AccumulationMode.DISCARDING)
      | 'Process ' >> beam.Map(getAttributes))
p3 = (p2 | 'Filter ' >> beam.Filter(lambda msg: (("xx" in msg) and (msg["xx"].lower() == "true")))
         | 'Delete ' >> beam.Map(deleteAttribute)
         | 'Write '  >> writeTable(bq_table_test, bq_batch_size))

def writeTable(table_name):
return beam.io.WriteToBigQuery(
    table=table_name,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    batch_size=100)

Я проверяю из отчетов по выставлению счетов, что вставки являются пакетными или потоковыми. Когда увеличивается использование вставки Streming, я понимаю, что массовая вставка не произошла. Есть ли другая функция, которую я могу проверить, была ли вставка потоковой или пакетной? А также, как я могу сделать пакетную вставку в BigQuery?

1 Ответ

0 голосов
/ 04 января 2019

Согласно документации вы не можете указать тип вставки, он автоматически определяется на основе вашего ввода PCollection:

Beam SDK для Python в настоящее время не поддерживаетуказание метода вставки.

BigQueryIO поддерживает два метода вставки данных в BigQuery: загрузка заданий и потоковая вставка.Каждый метод вставки обеспечивает различные компромиссы стоимости, квоты и согласованности данных.См. Документацию BigQuery для заданий загрузки и потоковых вставок для получения дополнительной информации об этих компромиссах.

BigQueryIO выбирает метод вставки по умолчанию на основе входной PCollection.

BigQueryIO использует задания загрузки при применении BigQueryIOпреобразование записи в ограниченную коллекцию PC.

BigQueryIO использует потоковые вставки при применении преобразования записи BigQueryIO к неограниченной коллекции PCol.

В вашем случае вы читаете из неограниченного источника (Pubsub), поэтому он всегда записывает поток в этом случае.Работа с окнами не изменит характер данных.

Один из обходных путей, который я могу придумать, - это разделить конвейер, например, потоковый конвейер записывает данные в коллекцию файлов в некотором хранилище (GCS), а затем другой конвейерчитать и загружать эти файлы (файлы ограничены).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...