используйте ValueProvider для форматирования BigQuery в потоке данных - PullRequest
0 голосов
/ 21 октября 2019

В настоящее время я работаю с Dataflow для выполнения периодической пакетной обработки в python.

По сути, я читаю данные из bigquery и выполняю на них какие-либо действия .. Мой конвейер выглядит так

pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))
             | "doing stuff" >> beam.Map(do_some_stuff)
             )

Я хочу запускать задания, используя шаблон потока данных, чтобы адаптировать его к среде выполнения.

Благодаря документации https://cloud.google.com/dataflow/docs/guides/templates/creating-templates, Использование ValueProvider в вашей части функций, мне удалось дать "do_some_stuff"дополнительный аргумент из среды выполнения с использованием ParDo.


class TemplateOption(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--template_do_stuff_param',
                                           default=45,
                                           type=int)
class MyDoStuffFn(beam.DoFn):
    def __init__(self, template_do_stuff_param):
      self.template_do_stuff_param = template_do_stuff_param

    def process(self, *_):
      yield do_some_stuff(self.template_do_stuff_param.get())


pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY),
                                                                                     use_standard_sql=True))
             | "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
             )

Но я также хочу изменить количество пользователей, вовлеченных в процесс, и поэтому я хочу адаптировать запрос к среде выполнения.


class TemplateOption(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--template_nb_users',
                                           default=100,
                                           type=int)
        parser.add_value_provider_argument('--template_do_stuff_param',
                                           default=45,
                                           type=int)
class MyDoStuffFn(beam.DoFn):
    def __init__(self, template_do_stuff_param):
      self.template_do_stuff_param = template_do_stuff_param

    def process(self, *_):
      yield do_some_stuff(self.template_do_stuff_param.get())


pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY.format(nb_users=template_option.template_nb_users.get()),
                                                                                     use_standard_sql=True))
             | "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
             )

... Это не работает, потому что я вызываю get () перед выполнением конвейера. До сих пор мне не удавалось адаптировать то, что я сделал для функции do_some_stuff, к строке «Чтение»

Любой совет или решение о том, как действовать, было бы наиболее ценно. Спасибо!

1 Ответ

2 голосов
/ 23 октября 2019

К сожалению, BigQuerySource не поддерживает поставщиков стоимости. Это связано с тем, что он изначально реализован в средстве выполнения потоков данных, и, следовательно, вся информация должна быть доступна во время строительства конвейера.

В будущем мы разработаем BigQuerySource, который поддерживает поставщиков значений, но, к сожалению, этов данный момент недоступно. : (

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...