При попытке инициализировать Python BigQuery Client () в облачном потоке данных apache beam Google выдает ошибку типа:
TypeError('__init__() takes 2 positional arguments but 3 were given')
Я использую Python 3.7 с потоком данных Apache Beam, и мне нужно инициализировать клиент и писать в BigQuery вручную вместо использования ptransform, потому что я хочу использовать динамическое имя таблицы, которое передается через параметры времени выполнения.
Я пытался передать проект и учетные данные клиенту, но, похоже, он ничего не сделал. Более того, если я использую google-cloud-bigquery == 1.11.2 вместо 1.13.0, он работает нормально, также использование 1.13.0 вне apache beam также работает полностью нормально.
Я явно вырезал немного кода, но это по сути то, что выдает ошибку
class SaveObjectsBigQuery(beam.DoFn):
def process(self, element, *args, **kwargs):
# Establish BigQuery client
client = bigquery.Client(project=project)
def run():
pipeline_options = PipelineOptions()
# GoogleCloud options object
cloud_options = pipeline_options.view_as(GoogleCloudOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
_data = (p
| "Create" >> beam.Create(["Start"])
)
save_data_bigquery = _data | "Save to BigQuery" >> beam.ParDo(SaveObjectsBigQuery())
В более ранних версиях google-cloud-bigquery это работает нормально, и я могу без проблем создать таблицу с параметром времени выполнения и insert_rows_json. Очевидно, что использование WriteToBigquery Ptransform было бы идеально, но это невозможно из-за необходимости динамического именования таблиц bigquery.
EDIT:
Я обновил код, чтобы попытаться убрать провайдер значений времени выполнения и лямбда-функцию, хотя и получил похожую ошибку для обоих:
Объект `AttributeError: 'function / RuntimeValueProvider' не имеет атрибута 'tableId'
По сути, я пытаюсь использовать провайдер значений времени выполнения при запуске шаблона потока данных для динамического именования таблицы больших запросов с использованием PTT-преобразования WriteToBigQuery.
save_data_bigquery = _data | WriteToBigQuery(
project=project,
dataset="campaign_contact",
table=value_provider.RuntimeValueProvider(option_name="table", default_value=None, value_type=str),
schema="id:STRING",
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND
)
save_data_bigquery = _data | WriteToBigQuery(
table=lambda table: f"{project}:dataset.{runtime_options.table}",
schema="id:STRING",
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND
)