Шаблон потока данных Python, делающий параметры времени выполнения глобально доступными - PullRequest
0 голосов
/ 05 сентября 2018

Таким образом, цель конвейера состоит в том, чтобы иметь возможность использовать переменные времени выполнения, чтобы иметь возможность открывать CSV-файл и называть таблицу BigQuery.

Все, что мне нужно, - это иметь доступ к этим переменным глобально или внутри ParDo, например, для анализа их в функции.

Я попытался создать фиктивную строку, а затем запустить FlatMap для доступа к параметрам времени выполнения и сделать их глобальными, хотя ничего не возвращается.

например.

class CustomPipelineOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--path',
            type=str,
            help='csv storage path')
        parser.add_value_provider_argument(
            '--table_name',
            type=str,
            help='Table Id')
def run()
    def rewrite_values(element):
        """ Rewrite default env values"""
        # global project_id
        # global campaign_id
        # global organization_id
        # global language
        # global file_path
        try:
            logging.info("File Path with str(): {}".format(str(custom_options.path)))
            logging.info("----------------------------")
            logging.info("element: {}".format(element))
            project_id = str(cloud_options.project)
            file_path = custom_options.path.get()
            table_name = custom_options.table_name.get()

            logging.info("project: {}".format(project_id))
            logging.info("File path: {}".format(file_path))
            logging.info("language: {}".format(table_name))
            logging.info("----------------------------")
        except Exception as e:
            logging.info("Error format----------------------------")
            raise KeyError(e)

        return file_path

    pipeline_options = PipelineOptions()
    cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    custom_options = pipeline_options.view_as(CustomPipelineOptions)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    # Beginning of the pipeline
    p = beam.Pipeline(options=pipeline_options)

    init_data = (p
                 | beam.Create(["Start"])
                 | beam.FlatMap(rewrite_values))

pipeline processing, running pipeline etc.

Я могу получить доступ к переменной проекта без проблем, хотя все остальное возвращается пустым.

Если я сделаю переменную custom_options глобальной или когда передам конкретный таможенный объект в функцию, такую ​​как: | 'Read data' >> beam.ParDo(ReadGcsBlobs(path_file=custom_options.path)), он вернет только что-то, например RuntimeValueProvider(option: path, type: str, default_value: None).

Если я использую | 'Read data' >> beam.ParDo(ReadGcsBlobs(path_file=custom_options.path.get())), переменная и пустая строка.

По сути, мне просто нужен глобальный доступ к этим переменным, возможно ли это?

Наконец, чтобы уточнить, я не хочу использовать метод ReadFromText, я могу использовать переменную времени выполнения там, хотя включение параметров времени выполнения в dict, созданный из файла csv, будет дорогостоящим, так как я работаю с огромные CSV-файлы.

1 Ответ

0 голосов
/ 05 сентября 2018

Для меня это сработало, объявив cloud_options и custom_options как global:

import argparse, logging

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class CustomPipelineOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--path',
            type=str,
            help='csv storage path')
        parser.add_value_provider_argument(
            '--table_name',
            type=str,
            help='Table Id')

def rewrite_values(element):
        """ Rewrite default env values"""
        # global project_id
        # global campaign_id
        # global organization_id
        # global language
        # global file_path
        try:
            logging.info("File Path with str(): {}".format(str(custom_options.path.get())))
            logging.info("----------------------------")
            logging.info("element: {}".format(element))
            project_id = str(cloud_options.project)
            file_path = custom_options.path.get()
            table_name = custom_options.table_name.get()

            logging.info("project: {}".format(project_id))
            logging.info("File path: {}".format(file_path))
            logging.info("language: {}".format(table_name))
            logging.info("----------------------------")
        except Exception as e:
            logging.info("Error format----------------------------")
            raise KeyError(e)

        return file_path


def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  global cloud_options
  global custom_options

  pipeline_options = PipelineOptions(pipeline_args)
  cloud_options = pipeline_options.view_as(GoogleCloudOptions)
  custom_options = pipeline_options.view_as(CustomPipelineOptions)
  pipeline_options.view_as(SetupOptions).save_main_session = True

  # Beginning of the pipeline
  p = beam.Pipeline(options=pipeline_options)

  init_data = (p
               | beam.Create(["Start"])
               | beam.FlatMap(rewrite_values))

  result = p.run()
  # result.wait_until_finish

if __name__ == '__main__':
  run()

После установки переменных PROJECT и BUCKET я поставил шаблон с:

python script.py \
    --runner DataflowRunner \
    --project $PROJECT \
    --staging_location gs://$BUCKET/staging \
    --temp_location gs://$BUCKET/temp \
    --template_location gs://$BUCKET/templates/global_options

И выполните его, указав опции path и table_name:

gcloud dataflow jobs run global_options \
   --gcs-location gs://$BUCKET/templates/global_options \
   --parameters path=test_path,table_name=test_table

И параметры среды выполнения, похоже, хорошо записываются в FlatMap:

enter image description here

...