В настоящее время я работаю с Dataflow для выполнения периодической пакетной обработки в python.
По сути, я читаю данные из bigquery и выполняю на них какие-либо действия .. Мой конвейер выглядит так
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
lines = (p
| 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))
| "doing stuff" >> beam.Map(do_some_stuff)
)
Я хочу запускать задания, используя шаблон потока данных, чтобы адаптировать его к среде выполнения.
Благодаря документации https://cloud.google.com/dataflow/docs/guides/templates/creating-templates, Использование ValueProvider в вашей части функций, мне удалось дать "do_some_stuff"дополнительный аргумент из среды выполнения с использованием ParDo.
class TemplateOption(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--template_do_stuff_param',
default=45,
type=int)
class MyDoStuffFn(beam.DoFn):
def __init__(self, template_do_stuff_param):
self.template_do_stuff_param = template_do_stuff_param
def process(self, *_):
yield do_some_stuff(self.template_do_stuff_param.get())
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)
lines = (p
| 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY),
use_standard_sql=True))
| "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
)
Но я также хочу изменить количество пользователей, вовлеченных в процесс, и поэтому я хочу адаптировать запрос к среде выполнения.
class TemplateOption(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--template_nb_users',
default=100,
type=int)
parser.add_value_provider_argument('--template_do_stuff_param',
default=45,
type=int)
class MyDoStuffFn(beam.DoFn):
def __init__(self, template_do_stuff_param):
self.template_do_stuff_param = template_do_stuff_param
def process(self, *_):
yield do_some_stuff(self.template_do_stuff_param.get())
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)
lines = (p
| 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY.format(nb_users=template_option.template_nb_users.get()),
use_standard_sql=True))
| "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
)
... Это не работает, потому что я вызываю get () перед выполнением конвейера. До сих пор мне не удавалось адаптировать то, что я сделал для функции do_some_stuff, к строке «Чтение»
Любой совет или решение о том, как действовать, было бы наиболее ценно. Спасибо!