У меня есть следующий рабочий процесс для пакета потока данных:
- Обновление кода в шаблоне (основной класс java)
- Загрузка шаблона в область памяти в GCP
- Запуск потока данных API-интерфейсом Dataflow в AppEngine
В конце завершения моего конвейера я публикую публикацию в PubSub, и у меня есть облачная функция, которая выбирает это сообщение по определенной теме и отправляет слабое уведомление.,Мне пришлось добавить pipeline.run().waitUntilItFinish()
, чтобы в конце вызвать уведомление PubSub.В противном случае он будет публиковать каждое преобразование, которое успешно обрабатывает PCollection
У меня есть текущий основной класс для моего конвейера:
public static void main(String[] args) throws Exception {
final CustomOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomOptions.class);
final Pair<String, LocalDatastoreHelper> datastorePair = DatastoreUpdater.createUpdateDatastoreFetchParameters(options.getEnv());
final String datastoreProjectId = datastorePair.getKey();
final Pipeline pipeline = Pipeline.create(options);
DataPreparation dataPreparation = new DataPreparation(pipeline, options);
GroupAccountProcessing.groupAccountUpdate(options, dataPreparation, datastoreProjectId);
GroupAccountProcessing.groupAccountSeatsUpdate(options, dataPreparation, datastoreProjectId);
TempAccountProcessing.processTempAccount(options, dataPreparation, datastoreProjectId);
DigitalAccountProcessing.processDigital(options, dataPreparation, datastoreProjectId);
ProductAccountProcessing.processProduct(options, dataPreparation, datastoreProjectId);
try {
PipelineResult.State state = pipeline.run().waitUntilFinish();
SlackNotification.notifySlack(options.getTopic(), options.getJobName(), options.getRunMode(), state);
} catch (Exception e) {
e.printStackTrace();
LocalDatastoreHelper helper = datastorePair.getValue();
if (helper != null) {
helper.stop();
System.setProperty("DATASTORE_EMULATOR_HOST", "");
System.setProperty("DATASTORE_PROJECT_ID", "");
}
}
}
Этот файл работает, только когда я запускаю конвейер локально с помощью командылиния, чтобы запустить его в облаке.Есть ли обходной путь, чтобы моя загрузка шаблона работала и не выкидывала исключение