Я использую шаблон потока данных для запуска облачного потока данных
Я предоставляю некоторые значения по умолчанию и вызываю шаблон.Поток данных правильно отображает параметры конвейера в сводке конвейера потока данных.но он не принимает значения времени выполнения.
class Mypipeoptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--preprocess_indir',
help='GCS path of the data to be preprocessed',
required=False,
default='gs://default/dataset/'
)
parser.add_value_provider_argument(
'--output_dir_train',
help='GCS path of the preprocessed train data',
required=False,
default='gs://default/train/'
)
parser.add_value_provider_argument(
'--output_dir_test',
help='GCS path of the preprocessed test data',
required=False,
default='gs://default/test/'
)
parser.add_value_provider_argument(
'--output_dir_validate',
help='GCS path of the preprocessed validate data',
required=False,
default='gs://default/validate/'
)
Затем я проверяю, доступны ли значения
p = beam.Pipeline(options=args)
if args.preprocess_indir.is_accessible():
input_dir = args.preprocess_indir
else:
input_dir = getValObj(args.preprocess_indir)
if args.output_dir_train.is_accessible():
output_train = args.output_dir_train
else:
output_train = getValObj(args.output_dir_train)
if args.output_dir_test.is_accessible():
output_test = args.output_dir_test
else:
output_test = getValObj(args.output_dir_test)
if args.output_dir_validate.is_accessible():
output_validate = args.output_dir_validate
else:
output_validate = getValObj(args.output_dir_validate)
Теперь при вызове шаблона я мог видеть значения, которые я хотел передать, как(Mypipeoptions) параметр конвейера parmater, но он не используется в фактическом прогоне, вместо этого используются параметры по умолчанию, заданные