Для моего потокового конвейера я вижу непредвиденное поведение при использовании параметра batch_size
для PTransform apache_beam.io.gcp.bigquery.WriteToBigQuery
? (источник - Cloud Pub / Sub, приемник - таблица BigQuery, пробовал как на DirectRunner, так и на DataFlowRunner, Python 3.7, apache-beam v2.15.0)
комментарии в коде говорят:
batch_size (int): Количество строк, которые должны быть записаны в BQ на одну потоковую вставку API. По умолчанию 500.
В моем коде конвейера я установил значение 100 в моем PTransform (значение по умолчанию 500)
pipeline | "read_sub" >> beam.io.gcp.pubsub.ReadFromPubSub(subscription=SUB)
| "extract_message" >> beam.Map(lambda m : json.loads(m.data)) \
| "convert_bq_format" >> beam.Map(lambda m : {"message" : str(m)}) \
| "write" >> bigquery.WriteToBigQuery(batch_size=100, project=PROJECT, dataset=BQ_DATASET, table=BQ_TABLE)
ОЖИДАЕМОЕ ПОВЕДЕНИЕ согласно моей интерпретации [поправьте меня, если я ошибаюсь в моей интерпретации]:
- ничего не должно быть записано в таблицу BQ, пока 100 элементов не будут опубликованы в pub / sub
- no StreamingМетод API
tabledata.insertAll
будет использован из API-интерфейса проекта. Даже проверил код здесь для process()
функции BigQueryWriteFn, которая вызывается WriteToBigQuery PTransform, и считаю, что моя интерпретация этого параметра имеет смысл (ничего не сбрасывается
АКТУАЛЬНОЕ ПОВЕДЕНИЕ , которое я вижу после публикации всего 10 элементов в пабе / подразделе:
- каждый из этих элементов успешно транслируетсяв таблицу BigQuery за считанные секунды
- Из квоты API проекта конвейер также потребовал 3 запроса к методу потоковой передачи bigquery.tabledata.insertAll (я бы по-прежнему искали т. д. 1, учитывая, что только 1 потоковый API может использоваться для потоковой передачи всего 10 сообщений с
batch_size =100
Может кто-нибудь уточнить, пожалуйста. почему я вижу это неожиданное поведение? или я что-то неправильно понял, как ведут себя настройки этого параметра?