С точки зрения деталей реализации в DataflowRunner, многие люди могут не заботиться о том, используется ли BigQuerySink
или WriteToBigQuery
.
Однако в моем случае я пытаюсь развернуть свой код вшаблоны потока данных с RunTimeValueProvider для аргументов. Это поведение поддерживается в WriteToBigQuery
:
class WriteToBigQuery(PTransform):
....
table (str, callable, ValueProvider): The ID of the table, or a callable
that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
numbers ``0-9``, or underscores ``_``. If dataset argument is
:data:`None` then the table argument must contain the entire table
reference specified as: ``'DATASET.TABLE'``
or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
argument representing an element to be written to BigQuery, and return
a TableReference, or a string table name as specified above.
Multiple destinations are only supported on Batch pipelines at the
moment.
Это не поддерживается в BigQuerySink:
class BigQuerySink(dataflow_io.NativeSink):
table (str): The ID of the table. The ID must contain only letters
``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If
**dataset** argument is :data:`None` then the table argument must
contain the entire table reference specified as: ``'DATASET.TABLE'`` or
``'PROJECT:DATASET.TABLE'``.
Что еще интереснее, это BigQuerySink
вкод не рекомендуется с 2.11.0.
@deprecated(since='2.11.0', current="WriteToBigQuery")
Однако в DataFlowRunner текущий код и комментарии кажутся совершенно не соответствующими ожиданиям, что WriteToBigQuery
является классом по умолчанию для использования над BigQuerySink
:
def apply_WriteToBigQuery(self, transform, pcoll, options):
# Make sure this is the WriteToBigQuery class that we expected, and that
# users did not specifically request the new BQ sink by passing experiment
# flag.
# TODO(BEAM-6928): Remove this function for release 2.14.0.
experiments = options.view_as(DebugOptions).experiments or []
if (not isinstance(transform, beam.io.WriteToBigQuery)
or 'use_beam_bq_sink' in experiments):
return self.apply_PTransform(transform, pcoll, options)
if transform.schema == beam.io.gcp.bigquery.SCHEMA_AUTODETECT:
raise RuntimeError(
'Schema auto-detection is not supported on the native sink')
standard_options = options.view_as(StandardOptions)
if standard_options.streaming:
if (transform.write_disposition ==
beam.io.BigQueryDisposition.WRITE_TRUNCATE):
raise RuntimeError('Can not use write truncation mode in streaming')
return self.apply_PTransform(transform, pcoll, options)
else:
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
schema = None
if transform.schema:
schema = parse_table_schema_from_json(json.dumps(transform.schema))
return pcoll | 'WriteToBigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
transform.table_reference.tableId,
transform.table_reference.datasetId,
transform.table_reference.projectId,
schema,
transform.create_disposition,
transform.write_disposition,
kms_key=transform.kms_key))
У меня двоякие вопросы:
- Почему разница в контракте / ожиданиях между классами
DataflowRunner
и io.BigQuery
? - Безв ожидании исправления ошибки, есть ли у кого-нибудь предложения о том, как заставить
DataflowRunner
использовать WriteToBigQuery
сверх BigQuerySink
?