Поток данных Проблемы с загрузкой шаблона с помощью pipe.waitUntilFinish () и публикацией в PubSub после - PullRequest
0 голосов
/ 11 декабря 2018

У меня есть следующий рабочий процесс для пакета потока данных:

  1. Обновление кода в шаблоне (основной класс java)
  2. Загрузка шаблона в область памяти в GCP
  3. Запуск потока данных API-интерфейсом Dataflow в AppEngine

В конце завершения моего конвейера я публикую публикацию в PubSub, и у меня есть облачная функция, которая выбирает это сообщение по определенной теме и отправляет слабое уведомление.,Мне пришлось добавить pipeline.run().waitUntilItFinish(), чтобы в конце вызвать уведомление PubSub.В противном случае он будет публиковать каждое преобразование, которое успешно обрабатывает PCollection

У меня есть текущий основной класс для моего конвейера:

public static void main(String[] args) throws Exception {
    final CustomOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomOptions.class);
    final Pair<String, LocalDatastoreHelper> datastorePair = DatastoreUpdater.createUpdateDatastoreFetchParameters(options.getEnv());
    final String datastoreProjectId = datastorePair.getKey();
    final Pipeline pipeline = Pipeline.create(options);
    DataPreparation dataPreparation = new DataPreparation(pipeline, options);

    GroupAccountProcessing.groupAccountUpdate(options, dataPreparation, datastoreProjectId);
    GroupAccountProcessing.groupAccountSeatsUpdate(options, dataPreparation, datastoreProjectId);
    TempAccountProcessing.processTempAccount(options, dataPreparation, datastoreProjectId);
    DigitalAccountProcessing.processDigital(options, dataPreparation, datastoreProjectId);
    ProductAccountProcessing.processProduct(options, dataPreparation, datastoreProjectId);

    try {
        PipelineResult.State state = pipeline.run().waitUntilFinish();
        SlackNotification.notifySlack(options.getTopic(), options.getJobName(), options.getRunMode(), state);
    } catch (Exception e) {
        e.printStackTrace();
        LocalDatastoreHelper helper = datastorePair.getValue();
        if (helper != null) {
            helper.stop();
            System.setProperty("DATASTORE_EMULATOR_HOST", "");
            System.setProperty("DATASTORE_PROJECT_ID", "");
        }
    }
}

Этот файл работает, только когда я запускаю конвейер локально с помощью командылиния, чтобы запустить его в облаке.Есть ли обходной путь, чтобы моя загрузка шаблона работала и не выкидывала исключение

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...