Ошибка при разделении pcollections на Dataner Runner - PullRequest
1 голос
/ 23 мая 2019

У меня есть конвейер Apache Beam, построенный на python.Я читаю строки из CSV-файла. Затем существуют общие шаги конвейера для всех коллекций.Это отлично работает.Для pcollections, которые прибывают из определенного имени файла, я хочу выполнить пару дополнительных шагов.Поэтому я помечаю pcollections в этом файле и выполняю дополнительные шаги для этих помеченных коллекций.Когда я запускаю конвейер в «Потоке данных», он выдает ошибку «Рабочий процесс не выполнен. Причины: Ожидается, что в пользовательском источнике будет ненулевое число разбиений».

Я протестировал, и это нормально работает на «DirectRunner».

lines = (p | beam.io.ReadFromText(input_file_path, skip_header_lines=1))

Generic = (lines | <"Do generic logic for all pCollections">)

tagged_lines = (lines | beam.ParDo(Tag(),input_file_path).with_outputs(Tag.TAG_OPTOUT,Tag.TAG_BOUNCE))

Optouts = (tagged_lines[Tag.TAG_OPTOUT] | <"Do logic 1">)

Bounces = (tagged_lines[Tag.TAG_BOUNCE] | <"Do logic 2">)

class Tag(beam.DoFn):
    TAG_OPTOUT = 'OPTOUT'
    TAG_BOUNCE = 'BOUNCE'
    def process(self, element,input_file_path):
        input_file = input_file_path.get()
        if "optout" in input_file:
            yield pvalue.TaggedOutput(self.TAG_OPTOUT, element)
        elif "bounce" in input_file:
        yield pvalue.TaggedOutput(self.TAG_BOUNCE, element)
...