У меня есть конвейер Apache Beam, построенный на python.Я читаю строки из CSV-файла. Затем существуют общие шаги конвейера для всех коллекций.Это отлично работает.Для pcollections, которые прибывают из определенного имени файла, я хочу выполнить пару дополнительных шагов.Поэтому я помечаю pcollections в этом файле и выполняю дополнительные шаги для этих помеченных коллекций.Когда я запускаю конвейер в «Потоке данных», он выдает ошибку «Рабочий процесс не выполнен. Причины: Ожидается, что в пользовательском источнике будет ненулевое число разбиений».
Я протестировал, и это нормально работает на «DirectRunner».
lines = (p | beam.io.ReadFromText(input_file_path, skip_header_lines=1))
Generic = (lines | <"Do generic logic for all pCollections">)
tagged_lines = (lines | beam.ParDo(Tag(),input_file_path).with_outputs(Tag.TAG_OPTOUT,Tag.TAG_BOUNCE))
Optouts = (tagged_lines[Tag.TAG_OPTOUT] | <"Do logic 1">)
Bounces = (tagged_lines[Tag.TAG_BOUNCE] | <"Do logic 2">)
class Tag(beam.DoFn):
TAG_OPTOUT = 'OPTOUT'
TAG_BOUNCE = 'BOUNCE'
def process(self, element,input_file_path):
input_file = input_file_path.get()
if "optout" in input_file:
yield pvalue.TaggedOutput(self.TAG_OPTOUT, element)
elif "bounce" in input_file:
yield pvalue.TaggedOutput(self.TAG_BOUNCE, element)