Я попытался осушить трубопровод программно. Первая часть кода запускает конвейер, который вызывается с использованием отдельного потока. Затем программа некоторое время спит, а затем пытается истощить конвейер.
Я пытался запустить в потоке данных, и это не сработало. Конвейер запускается, но затем оставшаяся часть кода выглядит как никогда не выполненная.
Пожалуйста, дайте мне знать, если это возможно.
Я пытался посмотреть журналирование, чтобы увидеть, сколько программы выполнено, но похоже, что Dataflow будет регистрировать только рабочие журналы, поэтому не мог видеть до того момента, где он выполнялся. Я считаю, что код после запуска конвейера не выполняется.
DataflowRunner runner = DataflowRunner.fromOptions(options);
DataflowPipelineJob pp = null;
// to run the pipeline which calls pipeline.run
new Thread(() -> runMethod(pp, runner, options)).start();
//Draining below
try {
Thread.sleep(360000);
GoogleCredential credential;
credential = GoogleCredential.getApplicationDefault();
if (credential.createScopedRequired()) {
credential = credential.createScoped(Collections.singletonList("https://www.googleapis.com/auth/cloud-platform"));
}
HttpTransport httpTransport;
httpTransport = GoogleNetHttpTransport.newTrustedTransport();
JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
Dataflow client = new Dataflow.Builder(httpTransport, jsonFactory, credential)
.setApplicationName("Google Cloud Platform Sample 1")
.build();
Job content = new Job();
content.setProjectId("sample-id");
content.setId(pp.getJobId());
content.setRequestedState("JOB_STATE_DRAINING");
client.projects().jobs()
.update("sample-id", pp.getJobId(), content)
.execute();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (GeneralSecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}