Python Apache Beam: потоковая дедупликация BigQuery по row_id - PullRequest
0 голосов
/ 28 июня 2018

Согласно документации BigQuery, вы можете обеспечить согласованность данных, предоставив insertId (https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency).. Если это не предусмотрено, BQ попытается обеспечить согласованность, основанную на внутренних идентификаторах и максимальных усилиях.

Используя BQ API, вы можете сделать это с помощью параметра row_ids (https://google -cloud-python.readthedocs.io / en / latest / bigquery / Generated / google.cloud.bigquery.client.Client. .insert_rows_json.html # google.cloud.bigquery.client.Client.insert_rows_json ), но я не могу найти то же самое для Apache Beam Python SDK.

Просматривая SDK, я заметил, что существует свойство «unique_row_id», но я действительно не знаю, как передать свой параметр в WriteToBigQuery()

Как я могу записать в BQ (потоковая передача), предоставляя идентификатор строки для дедупликации?

1 Ответ

0 голосов
/ 10 октября 2018

Обновление:

Если вы используете WriteToBigQuery, он автоматически создаст и вставьте уникальный идентификатор строки с именем insertId для вас, который будет вставлен в bigquery. Это обработано для вас, вам не нужно беспокоиться об этом. :)

  1. WriteToBigQuery - это PTransform, а в его expand вызовах методов BigQueryWriteFn
  2. BigQueryWriteFn - это DoFn, а в его process вызовах методов _flush_batch
  3. _flush_batch - это метод, который затем вызывает BigQueryWrapper.insert_rows метод
  4. BigQueryWrspper.insert_rows создает список bigquery.TableDataInsertAllRequest.RowsValueListEntry объектов, которые содержат insertId и данные строки в виде объекта json
  5. insertId генерируется путем вызова метода unique_row_id, который возвращает значение, состоящее из UUID4, объединенного с _ и с автоматически увеличивающимся числом.

В текущем коде 2.7.0 есть этот счастливый комментарий; Я также подтвердил, что это правда :) https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1182

# Prepare rows for insertion. Of special note is the row ID that we add to
# each row in order to help BigQuery avoid inserting a row multiple times.
# BigQuery will do a best-effort if unique IDs are provided. This situation
# can happen during retries on failures.

* Не использовать BigQuerySink

По крайней мере, не в его текущей форме, поскольку он не поддерживает потоковую передачу. Я думаю, это может измениться.


Оригинальный (не) ответ

Отличный вопрос, я тоже посмотрел и не смог найти определенный ответ.

Похоже, что Apache Beam не использует этот gd.cloud.bigquery клиент SDK, на который вы ссылались, у него есть какой-то внутренний сгенерированный API-клиент, но, похоже, он обновлен.

Я посмотрел на источник: insertall метод есть https://github.com/apache/beam/blob/18d2168ee71a1b1b04976717f0f955199bb00961/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py#L476

Я также нашел упомянутый insertid https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py#L1707

Так что, если вы можете сделать вызов InsertAll, он будет использовать TableDataInsertAllRequest и передаст RowsValueListEntry

class TableDataInsertAllRequest(_messages.Message):
  """A TableDataInsertAllRequest object.
  Messages:
    RowsValueListEntry: A RowsValueListEntry object.

В сообщении RowsValueListEntry указана вставка.

Вот документы API для вставки всех https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll

Я еще посмотрю на это, потому что не вижу WriteToBigQuery(), разоблачающего это.

Я подозреваю, что «bigquery запомнит это хотя бы на одну минуту» - довольно слабая гарантия для дупупинга. Документы предлагают использовать хранилище данных, если вам нужны транзакции. В противном случае вам может потребоваться запустить SQL с оконными функциями для дедупликации во время выполнения или запустить другие задания дедупликации в bigquery.

Возможно, использование batch_size параметра WriteToBigQuery() и запуск комбинированного (или в худшем случае шага GroupByKey) в потоке данных - более стабильный способ устранения ошибок перед записью.

...