Обновление:
Если вы используете WriteToBigQuery
, он автоматически создаст и
вставьте уникальный идентификатор строки с именем insertId
для вас, который будет вставлен в bigquery. Это обработано для вас, вам не нужно беспокоиться об этом. :)
WriteToBigQuery
- это PTransform
, а в его expand
вызовах методов BigQueryWriteFn
BigQueryWriteFn
- это DoFn
, а в его process
вызовах методов _flush_batch
_flush_batch
- это метод, который затем вызывает BigQueryWrapper.insert_rows
метод
BigQueryWrspper.insert_rows
создает список bigquery.TableDataInsertAllRequest.RowsValueListEntry
объектов, которые содержат insertId
и данные строки в виде объекта json
-
insertId
генерируется путем вызова метода unique_row_id
, который возвращает значение, состоящее из UUID4, объединенного с _
и с автоматически увеличивающимся числом.
В текущем коде 2.7.0 есть этот счастливый комментарий; Я также подтвердил, что это правда :)
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1182
# Prepare rows for insertion. Of special note is the row ID that we add to
# each row in order to help BigQuery avoid inserting a row multiple times.
# BigQuery will do a best-effort if unique IDs are provided. This situation
# can happen during retries on failures.
* Не использовать BigQuerySink
По крайней мере, не в его текущей форме, поскольку он не поддерживает потоковую передачу. Я думаю, это может измениться.
Оригинальный (не) ответ
Отличный вопрос, я тоже посмотрел и не смог найти определенный ответ.
Похоже, что Apache Beam не использует этот gd.cloud.bigquery клиент SDK, на который вы ссылались, у него есть какой-то внутренний сгенерированный API-клиент, но, похоже, он обновлен.
Я посмотрел на источник:
insertall
метод есть https://github.com/apache/beam/blob/18d2168ee71a1b1b04976717f0f955199bb00961/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py#L476
Я также нашел упомянутый insertid
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py#L1707
Так что, если вы можете сделать вызов InsertAll, он будет использовать TableDataInsertAllRequest
и передаст RowsValueListEntry
class TableDataInsertAllRequest(_messages.Message):
"""A TableDataInsertAllRequest object.
Messages:
RowsValueListEntry: A RowsValueListEntry object.
В сообщении RowsValueListEntry
указана вставка.
Вот документы API для вставки всех
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
Я еще посмотрю на это, потому что не вижу WriteToBigQuery()
, разоблачающего это.
Я подозреваю, что «bigquery запомнит это хотя бы на одну минуту» - довольно слабая гарантия для дупупинга. Документы предлагают использовать хранилище данных, если вам нужны транзакции. В противном случае вам может потребоваться запустить SQL с оконными функциями для дедупликации во время выполнения или запустить другие задания дедупликации в bigquery.
Возможно, использование batch_size
параметра WriteToBigQuery()
и запуск комбинированного (или в худшем случае шага GroupByKey) в потоке данных - более стабильный способ устранения ошибок перед записью.