Применение преобразования к одному выходному тегу - PullRequest
0 голосов
/ 11 декабря 2018

Я думаю, что у меня есть функция, которая производит два выхода (пожалуйста, исправьте меня, если я ошибаюсь):

PCollection<String> words = ...;

final TupleTag<String> shortWordsTag = new TupleTag<String>(){};

PCollectionTuple results =
     words.apply(
         ParDo
         .of(new DoFn<String, String>() {
             @ProcessElement
             public void processElement(ProcessContext context) {
                 String word = context.element();
                 if (word.length() < 5) {
                     context.output(shortWordsTag, word);
                 } else {
                     context.output(word);
             }

Теперь я хотел бы вызвать другую функцию, но применить ее только к одному из них.выходы.Примерно так:

results.apply(
    ParDo
    .of(new DoFn<String, String>() {
        @ProcessElement
        public void processElement(ProcessContext context) {
            String word = context.element();
            // do stuff, but should only have words with length < 5 here
    }
)

Я вижу некоторые примеры, которые используют withOutputTags, но этот метод, кажется, принимает более одного тега (тег и список тегов), и я не уверенкак использовать его для моего сценария.

Как я могу указать, что мой results.apply будет вызываться только для данных, которые выводятся в тег shortWordsTag?

1 Ответ

0 голосов
/ 11 декабря 2018

Правильный способ работы с несколькими выходами в одном преобразовании в Apache Beam действительно использует PCollectionTuple и withOutputTags, как вы упомянули.

В документации по Apache Beam вы можете найти некоторые действительно хорошиепримеры того, как настроить преобразование с несколькими выходами, используя разные теги для каждого из них:

Кроме того, если вы посетите раздел 4.5.2 во второй ссылке выше, вы найдете пример того, как излучать на несколько выходов вваш DoFn.Короче говоря, и используя основной код, которым вы поделились, вам нужно будет сделать следующее:

PCollectionTuple results = [...].withOutputTags(MAIN_TAG, LIST_OF_ADDITIONAL_TAGS);

results.get(YOUR_DESIRED_TAG).apply(...);

Вызов метода get( ) для PCollectionTuple вернет PCollection, связанную сTupleTag, который вы будете передавать внутри метода.

...