Лучший способ ответить на этот пример - пример. Этот пример доступен в Beam .
Предположим, что вы хотите запустить конвейер подсчета слов (например, подсчитать, сколько раз каждое слово появляется в документе). Для этого вам нужно разбить строки в файле на отдельные слова. Учтите, что вы также хотите рассчитывать длины слов индивидуально. Ваше преобразование расщепления будет выглядеть так:
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input) # Read in the file
# with_outputs allows accessing the explicitly tagged outputs of a DoFn.
split_lines_result = (lines
| beam.ParDo(SplitLinesToWordsFn()).with_outputs(
SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
main='words'))
short_words = split_lines_result['words']
character_count = split_lines_result[
SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT]
В этом случае каждый из них отличается PCollection
с правильными элементами. DoFn
будет отвечать за разделение своих выходов, и он делает это, помечая элементы. См:
class SplitLinesToWordsFn(beam.DoFn):
OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'
def process(self, element):
# yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged
# collection.
yield pvalue.TaggedOutput(
self.OUTPUT_TAG_CHARACTER_COUNT, len(element))
words = re.findall(r'[A-Za-z\']+', element)
for word in words:
# yield word to add it to the main collection.
yield word
Как видите, для основного выхода вам не нужно помечать элементы, а для других выходов, которые вы делаете.