Инструкция If для шагов в конвейере потока данных Apache Beam (Python) - PullRequest
0 голосов
/ 26 сентября 2019

Мне было интересно, возможно ли использовать оператор if в конвейере лучей для принятия другого преобразования на основе разных сценариев.Например:

1) Сделайте один из входных аргументов засыпкой / регулярным, а затем на основе этого входного аргумента он решит, начинать ли с

(p 
            | fileio.MatchFiles(known_args.input_bucket)
            | fileio.ReadMatches()
            | beam.Map(lambda file: file.metadata.path, json.loads(file.read_utf8())))

или

p | beam.io.ReadFromText(known_args.input_file_name)

2) Если имя файла содержит определенное название страны (например, США), позвоните TransformUSA(beam.DoFn), иначе позвоните TransformAllCountries(beam.DoFn)

Извините, если это не очень хороший вопрос, я не виделэто где-то еще, и я пытаюсь сделать мой код модульным вместо того, чтобы иметь отдельные конвейеры

1 Ответ

2 голосов
/ 26 сентября 2019

Вполне возможно иметь оператор if для вашего конвейера, но помните, что все должно быть известно во время строительства конвейера.Так, например:

with beam.Pipeline(...) as p:
  if known_args.backfill == True:
    input_pcoll = (p
                   | fileio.MatchFiles(known_args.input_bucket)
                   | fileio.ReadMatches()
                   | beam.Map(lambda file: file.read_utf8().split('\n'))
  else:
    input_pcoll = (p
                   | beam.io.ReadFromText(known_args.input_file_name)

А затем для вашего TransformUSA вы бы сделали что-то вроде:

if 'USA' in known_args.input_file_name:
  next_pcoll = input_pcoll | beam.ParDo(TransformUSA())
else:
  next_pcoll = input_pcoll | beam.ParDo(TransformAllCountries())

Имеет ли это смысл?

...