Apache Beam Python Bigquery изменить потоковую вставку в пакетную вставку? - PullRequest
0 голосов
/ 11 января 2019

Я выполняю задание Apache Beam, которое читает данные из корзины, выполняет некоторые преобразования и пишет в bigquery. Но записи вставляются в потоковый буфер.

validated_data = (p1
                  | 'Read files from Storage '+url >> beam.io.ReadFromText(url)
                  | 'Validate records ' + url >> beam.Map(data_ingestion.validate, url)\
                  .with_outputs(SUCCESS_TAG_KEY, FAILED_TAG_KEY, main="main")
)
all_data, _, _ = validated_data
success_records = validated_data[SUCCESS_TAG_KEY]
failed_records = validated_data[FAILED_TAG_KEY]


(success_records
 | 'Extracting row from tagged row {}'.format(url) >> beam.Map(lambda row: row['row'])
 | 'Write to BigQuery table for {}'.format(url) >> beam.io.WriteToBigQuery(
            table=data_ingestion.get_table(tmp=TEST, run_date=data_ingestion.run_date),
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
            )
)

На самом деле мне нужно удалить раздел перед запуском выше, чтобы избежать дублирования записей для секционированной таблицы времени приема.

И скажите, если я выполню это задание более одного раза для одного и того же файла, не обрезая таблицу, в результате в таблице появятся повторяющиеся записи.

И поскольку последние записи находятся в потоковом буфере, команда удаления таблицы разделов фактически не удаляет раздел. Ниже приведен код, который я использую для усечения таблицы. и этот код выполняется до запуска конвейера

client = bigquery.Client()
dataset = TABLE_MAP['dataset']
table = TABLE_MAP[sentiment_pipeline][table_type]['table']
table_id = "{}${}".format(table, format_date(run_date, '%Y%m%d'))
table_ref = client.dataset(dataset).table(table_id)
output = client.delete_table(table_ref)

1 Ответ

0 голосов
/ 31 июля 2019

Согласно документации BigQuery, вам может потребоваться подождать 30 минут, чтобы сделать оператор DML для потоковой таблицы , а изменения схемы, такие как удаление / усечение таблиц, могут привести к потере данных для некоторые сценарии . Здесь - некоторые обходные пути, которые вы можете попробовать использовать для устранения дубликатов в сценарии потоковой передачи.

Кроме того, Apache Beam и Поток данных теперь поддерживает пакетную вставку для Python, поэтому это может быть хорошим способом избежать ограничений потоковой передачи.

...