Шаблон потока данных не использует параметры времени выполнения - PullRequest
1 голос
/ 23 мая 2019

Я использую шаблон потока данных для запуска облачного потока данных

Я предоставляю некоторые значения по умолчанию и вызываю шаблон.Поток данных правильно отображает параметры конвейера в сводке конвейера потока данных.но он не принимает значения времени выполнения.

class Mypipeoptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--preprocess_indir',
            help='GCS path of the data to be preprocessed',
            required=False,
            default='gs://default/dataset/'
        )
        parser.add_value_provider_argument(
            '--output_dir_train',
            help='GCS path of the preprocessed train data',
            required=False,
            default='gs://default/train/'
        )
        parser.add_value_provider_argument(
            '--output_dir_test',
            help='GCS path of the preprocessed test data',
            required=False,
            default='gs://default/test/'
        )
        parser.add_value_provider_argument(
            '--output_dir_validate',
            help='GCS path of the preprocessed validate data',
            required=False,
            default='gs://default/validate/'
        )

Затем я проверяю, доступны ли значения

p = beam.Pipeline(options=args)

    if args.preprocess_indir.is_accessible():
        input_dir = args.preprocess_indir
    else:
        input_dir = getValObj(args.preprocess_indir)


    if args.output_dir_train.is_accessible():
        output_train = args.output_dir_train
    else:
        output_train = getValObj(args.output_dir_train)

    if args.output_dir_test.is_accessible():
        output_test = args.output_dir_test
    else:
        output_test = getValObj(args.output_dir_test)

    if args.output_dir_validate.is_accessible():
        output_validate = args.output_dir_validate
    else:
        output_validate = getValObj(args.output_dir_validate)

Теперь при вызове шаблона я мог видеть значения, которые я хотел передать, как(Mypipeoptions) параметр конвейера parmater, но он не используется в фактическом прогоне, вместо этого используются параметры по умолчанию, заданные

1 Ответ

2 голосов
/ 24 мая 2019

Я думаю, что нашел решение, я назначал параметры времени выполнения переменным и затем передавал их на вход или выход.

Когда я непосредственно передавал параметры времени выполнения источнику или приемнику, это работало.Как и в приведенном ниже

 'Write train dataset to destination' >> beam.io.tfrecordio.WriteToTFRecord(
        file_path_prefix=args.output_dir_train
    ) 

Я считаю, что часть, которую я пропустил, заключалась в том, что при создании шаблона он строит график, и только параметры времени выполнения могут быть подключены к его времени выполнения.Другие вычисления уже сделаны при построении графика.

Пожалуйста, исправьте меня, если я ошибаюсь

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...