Я пытаюсь создать шаблон потока данных, который принимает входной параметр как RuntimeValue
. Следуя примеру , документы
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
# [START example_wordcount_templated]
class WordcountTemplatedOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
# Use add_value_provider_argument for arguments to be templatable
# Use add_argument as usual for non-templatable arguments
parser.add_value_provider_argument(
'--input', help='Path of the file to read from')
parser.add_argument(
'--output', required=True, help='Output file to write results to.')
pipeline_options = PipelineOptions(['--output', 'some/output_path'])
with beam.Pipeline(options=pipeline_options) as p:
wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions)
lines = p | 'Read' >> ReadFromText(wordcount_options.input)
# [END example_wordcount_templated]
(взятые непосредственно из официальных фрагментов ) выдают следующую ошибку при попытке создать шаблон с помощью следующей команды ( с заполненными данными):
python -m examples.mymodule \
--runner DataflowRunner \
--project YOUR_PROJECT_ID \
--staging_location gs://YOUR_BUCKET_NAME/staging \
--temp_location gs://YOUR_BUCKET_NAME/temp \
--template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME
File "lib/python3.7/site-packages/apache_beam/options/value_provider.py", line 139, in _f
raise error.RuntimeValueProviderError('%s not accessible' % obj)
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input, type: str, default_value: None) not
accessible
Документы также утверждают, что:
Некоторые соединители ввода / вывода содержат методы, которые принимают объекты ValueProvider , Чтобы определить поддержку соединителей ввода-вывода и их методов, см. Справочную документацию API для соединителя. Следующие соединители ввода / вывода принимают параметры времени выполнения:
На основе файла IOs: textio, avroio, tfrecordio
Я не уверен, почему код примера дает ошибки. Кто-нибудь может мне помочь?
Для чего стоит я использую:
apache-beam = {extras = ["gcp"], version = "^2.19.0"}