Apache Beam - пропустить шаг конвейера - PullRequest
0 голосов
/ 07 декабря 2018

Я использую Apache Beam для настройки конвейера, состоящего из 2 основных шагов:

  • преобразование данных с использованием преобразования Beam
  • загрузка преобразованных данных в BigQuery

Настройка конвейера выглядит следующим образом:

myPCollection = (org.apache.beam.sdk.values.PCollection<myCollectionObjectType>)myInputPCollection
                .apply("do a parallel transform"),
                     ParDo.of(new MyTransformClassName.MyTransformFn()));

 myPCollection
    .apply("Load BigQuery data for PCollection",
            BigQueryIO.<myCollectionObjectType>write()
            .to(new MyDataLoadClass.MyFactTableDestination(myDestination))
            .withFormatFunction(new MyDataLoadClass.MySerializationFn())

Я смотрел на этот вопрос:

Apache Beam: пропуск шагов в уже построенномконвейер

, который говорит о том, что я могу каким-то образом динамически изменить вывод, на который я могу передавать данные, следуя параллельному преобразованию на шаге 1.

Как мне это сделать?Я не знаю, как выбрать, передавать или нет myPCollection с шага 1 на шаг 2. Мне нужно пропустить шаг 2, если объект на myPCollection с шага 1 равен null.

1 Ответ

0 голосов
/ 07 декабря 2018

Вы просто не излучаете элемент из вашего MyTransformClassName.MyTransformFn, когда вы не хотите его на следующем шаге, например, что-то вроде этого:

class MyTransformClassName.MyTransformFn extends...
  @ProcessElement
  public void processElement(ProcessContext c, ...) {
    ...
    result = ...
    if (result != null) {
       c.output(result);   //only output something that's not null
    }
  }

Таким образом, нули не достигаютследующий шаг.

Подробнее см. в разделе ParDo данного руководства: https://beam.apache.org/documentation/programming-guide/#pardo

...