Побочный вывод в ParDo | Apache Beam Python SDK - PullRequest
0 голосов
/ 14 сентября 2018

Поскольку документация доступна только для JAVA, я не мог понять, что это значит.

В нем указано - "В то время как ParDo всегда производит основную выходную PCollection (как возвращаемое значение из применения), вы также можете сделать так, чтобы ParDo производил любое количество дополнительных выходных PCollections. Если вы решите иметь несколько выходов, ваш ParDo вернет все выходные PCollections (включая основной выход), связанные вместе. Например, в Java выходные PCollections связаны в типобезопасный PCollectionTuple. "

Я понимаю, что означает объединенный пакет, но если я получаю тег в моем DoFn, он дает пакет с другими пустыми выходными данными на ходу и дает другие выходные данные, когда они встречаются в коде? или он ждет, когда все выходы будут готовы для ввода, и выводит их все вместе в связку?

В документации не так много ясности. Хотя я думаю, что он не ждет и просто уступает, когда сталкивается, но мне все еще нужно понять, что происходит.

1 Ответ

0 голосов
/ 15 сентября 2018

Лучший способ ответить на этот пример - пример. Этот пример доступен в Beam .

Предположим, что вы хотите запустить конвейер подсчета слов (например, подсчитать, сколько раз каждое слово появляется в документе). Для этого вам нужно разбить строки в файле на отдельные слова. Учтите, что вы также хотите рассчитывать длины слов индивидуально. Ваше преобразование расщепления будет выглядеть так:

with beam.Pipeline(options=pipeline_options) as p:

    lines = p | ReadFromText(known_args.input)  # Read in the file

    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
    split_lines_result = (lines
                          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
                              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
                              main='words'))

    short_words = split_lines_result['words']
    character_count = split_lines_result[
        SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT]

В этом случае каждый из них отличается PCollection с правильными элементами. DoFn будет отвечать за разделение своих выходов, и он делает это, помечая элементы. См:

class SplitLinesToWordsFn(beam.DoFn):
  OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'

  def process(self, element):
    # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged
    # collection.
    yield pvalue.TaggedOutput(
        self.OUTPUT_TAG_CHARACTER_COUNT, len(element))

    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      # yield word to add it to the main collection.
      yield word

Как видите, для основного выхода вам не нужно помечать элементы, а для других выходов, которые вы делаете.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...