Ошибка при подключении к Google Cloud BigQuery из Apache Beam Dataflow в Python? - PullRequest
0 голосов
/ 04 июня 2019

При попытке инициализировать Python BigQuery Client () в облачном потоке данных apache beam Google выдает ошибку типа:

TypeError('__init__() takes 2 positional arguments but 3 were given')

Я использую Python 3.7 с потоком данных Apache Beam, и мне нужно инициализировать клиент и писать в BigQuery вручную вместо использования ptransform, потому что я хочу использовать динамическое имя таблицы, которое передается через параметры времени выполнения.

Я пытался передать проект и учетные данные клиенту, но, похоже, он ничего не сделал. Более того, если я использую google-cloud-bigquery == 1.11.2 вместо 1.13.0, он работает нормально, также использование 1.13.0 вне apache beam также работает полностью нормально.

Я явно вырезал немного кода, но это по сути то, что выдает ошибку

class SaveObjectsBigQuery(beam.DoFn):
    def process(self, element, *args, **kwargs):
        # Establish BigQuery client
        client = bigquery.Client(project=project)


def run():
    pipeline_options = PipelineOptions()

    # GoogleCloud options object
    cloud_options = pipeline_options.view_as(GoogleCloudOptions)

    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as p:
        _data = (p
                 | "Create" >> beam.Create(["Start"])
                 )

        save_data_bigquery = _data | "Save to BigQuery" >> beam.ParDo(SaveObjectsBigQuery())

В более ранних версиях google-cloud-bigquery это работает нормально, и я могу без проблем создать таблицу с параметром времени выполнения и insert_rows_json. Очевидно, что использование WriteToBigquery Ptransform было бы идеально, но это невозможно из-за необходимости динамического именования таблиц bigquery.

EDIT:

Я обновил код, чтобы попытаться убрать провайдер значений времени выполнения и лямбда-функцию, хотя и получил похожую ошибку для обоих:

Объект `AttributeError: 'function / RuntimeValueProvider' не имеет атрибута 'tableId'

По сути, я пытаюсь использовать провайдер значений времени выполнения при запуске шаблона потока данных для динамического именования таблицы больших запросов с использованием PTT-преобразования WriteToBigQuery.

save_data_bigquery = _data | WriteToBigQuery(
            project=project,
            dataset="campaign_contact",
            table=value_provider.RuntimeValueProvider(option_name="table", default_value=None, value_type=str),
            schema="id:STRING",
            create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=BigQueryDisposition.WRITE_APPEND
        )
save_data_bigquery = _data | WriteToBigQuery(
            table=lambda table: f"{project}:dataset.{runtime_options.table}",
            schema="id:STRING",
            create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=BigQueryDisposition.WRITE_APPEND
        )

1 Ответ

1 голос
/ 04 июня 2019

Начиная с Луча 2.12, вы можете использовать преобразование WriteToBigQuery для динамического назначения адресатов.Я бы порекомендовал вам попробовать:)

Проверьте этот тест в кодовой базе Beam, который показывает пример этого.

...