Я использую Apache Beam для настройки конвейера, состоящего из 2 основных шагов:
- преобразование данных с использованием преобразования Beam
- загрузка преобразованных данных в BigQuery
Настройка конвейера выглядит следующим образом:
myPCollection = (org.apache.beam.sdk.values.PCollection<myCollectionObjectType>)myInputPCollection
.apply("do a parallel transform"),
ParDo.of(new MyTransformClassName.MyTransformFn()));
myPCollection
.apply("Load BigQuery data for PCollection",
BigQueryIO.<myCollectionObjectType>write()
.to(new MyDataLoadClass.MyFactTableDestination(myDestination))
.withFormatFunction(new MyDataLoadClass.MySerializationFn())
Я смотрел на этот вопрос:
Apache Beam: пропуск шагов в уже построенномконвейер
, который говорит о том, что я могу каким-то образом динамически изменить вывод, на который я могу передавать данные, следуя параллельному преобразованию на шаге 1.
Как мне это сделать?Я не знаю, как выбрать, передавать или нет myPCollection
с шага 1 на шаг 2. Мне нужно пропустить шаг 2, если объект на myPCollection
с шага 1 равен null
.