Beam Python Data Flow Runner Использует устаревший BigQuerySink вместо WriteToBigQuery в apply_WriteToBigQuery - PullRequest
1 голос
/ 04 октября 2019

С точки зрения деталей реализации в DataflowRunner, многие люди могут не заботиться о том, используется ли BigQuerySink или WriteToBigQuery.

Однако в моем случае я пытаюсь развернуть свой код вшаблоны потока данных с RunTimeValueProvider для аргументов. Это поведение поддерживается в WriteToBigQuery:

class WriteToBigQuery(PTransform):
....

 table (str, callable, ValueProvider): The ID of the table, or a callable
         that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
         numbers ``0-9``, or underscores ``_``. If dataset argument is
         :data:`None` then the table argument must contain the entire table
         reference specified as: ``'DATASET.TABLE'``
         or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
         argument representing an element to be written to BigQuery, and return
         a TableReference, or a string table name as specified above.
         Multiple destinations are only supported on Batch pipelines at the
         moment.

Это не поддерживается в BigQuerySink:

class BigQuerySink(dataflow_io.NativeSink):
      table (str): The ID of the table. The ID must contain only letters
        ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If
        **dataset** argument is :data:`None` then the table argument must
        contain the entire table reference specified as: ``'DATASET.TABLE'`` or
        ``'PROJECT:DATASET.TABLE'``.

Что еще интереснее, это BigQuerySink вкод не рекомендуется с 2.11.0.

@deprecated(since='2.11.0', current="WriteToBigQuery")

Однако в DataFlowRunner текущий код и комментарии кажутся совершенно не соответствующими ожиданиям, что WriteToBigQuery является классом по умолчанию для использования над BigQuerySink:

  def apply_WriteToBigQuery(self, transform, pcoll, options):
    # Make sure this is the WriteToBigQuery class that we expected, and that
    # users did not specifically request the new BQ sink by passing experiment
    # flag.

    # TODO(BEAM-6928): Remove this function for release 2.14.0.
    experiments = options.view_as(DebugOptions).experiments or []
    if (not isinstance(transform, beam.io.WriteToBigQuery)
        or 'use_beam_bq_sink' in experiments):
      return self.apply_PTransform(transform, pcoll, options)
    if transform.schema == beam.io.gcp.bigquery.SCHEMA_AUTODETECT:
      raise RuntimeError(
          'Schema auto-detection is not supported on the native sink')
    standard_options = options.view_as(StandardOptions)
    if standard_options.streaming:
      if (transform.write_disposition ==
          beam.io.BigQueryDisposition.WRITE_TRUNCATE):
        raise RuntimeError('Can not use write truncation mode in streaming')
      return self.apply_PTransform(transform, pcoll, options)
    else:
      from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
      schema = None
      if transform.schema:
        schema = parse_table_schema_from_json(json.dumps(transform.schema))
      return pcoll  | 'WriteToBigQuery' >> beam.io.Write(
          beam.io.BigQuerySink(
              transform.table_reference.tableId,
              transform.table_reference.datasetId,
              transform.table_reference.projectId,
              schema,
              transform.create_disposition,
              transform.write_disposition,
              kms_key=transform.kms_key))

У меня двоякие вопросы:

  1. Почему разница в контракте / ожиданиях между классами DataflowRunner и io.BigQuery?
  2. Безв ожидании исправления ошибки, есть ли у кого-нибудь предложения о том, как заставить DataflowRunner использовать WriteToBigQuery сверх BigQuerySink?

1 Ответ

1 голос
/ 04 октября 2019

Преобразование WriteToBigQuery имеет две разные стратегии записи в BigQuery:

  • Потоковые вставки в конечную точку BigQuery
  • Задания загрузки файлов запускаются периодически (или один раз для пакетных конвейеров)

Для Python SDK у нас изначально была только поддержка потоковых вставок, и у нас была встроенная реализация для загрузки файлов, которая работала только с потоком данных (это BigQuerySink).

Для пакетных конвейеров, работающих в потоке данных, BigQuerySink заменяется на - как вы правильно обнаружили. Для всех остальных случаев использовались потоковые вставки.

В последней версии Beam мы добавили встроенную поддержку загрузки файлов в SDK - реализация этого в BigQueryBatchFileLoads.

Поскольку мы не хотели нарушать работу пользователей, полагаясь на старое поведение, мы скрывали BigQueryBatchFileLoads за флагом эксперимента. (флаг use_beam_bq_sink).

Итак:

  • В будущей версии мы будем автоматически использовать BigQueryBatchFileLoads, но на данный момент у вас есть два вариантачтобы иметь к нему доступ:

    1. Используйте его прямо в вашем конвейере (например, input | BigQueryBatchFileLoads(...)).
    2. Передайте опцию --experiments use_beam_bq_sink при использовании WriteToBigQuery.

Надеюсь, это поможет!

...