Поток данных для программного слива конвейера - PullRequest
0 голосов
/ 25 августа 2018

Я попытался осушить трубопровод программно. Первая часть кода запускает конвейер, который вызывается с использованием отдельного потока. Затем программа некоторое время спит, а затем пытается истощить конвейер. Я пытался запустить в потоке данных, и это не сработало. Конвейер запускается, но затем оставшаяся часть кода выглядит как никогда не выполненная. Пожалуйста, дайте мне знать, если это возможно.

Я пытался посмотреть журналирование, чтобы увидеть, сколько программы выполнено, но похоже, что Dataflow будет регистрировать только рабочие журналы, поэтому не мог видеть до того момента, где он выполнялся. Я считаю, что код после запуска конвейера не выполняется.

DataflowRunner runner = DataflowRunner.fromOptions(options);
           DataflowPipelineJob pp = null; 
        // to run the pipeline which calls pipeline.run  
    new Thread(() -> runMethod(pp, runner, options)).start();   

        //Draining below
        try {
            Thread.sleep(360000);           
            GoogleCredential credential;
            credential = GoogleCredential.getApplicationDefault();
               if (credential.createScopedRequired()) {
                   credential = credential.createScoped(Collections.singletonList("https://www.googleapis.com/auth/cloud-platform"));
               }
               HttpTransport httpTransport;        
               httpTransport = GoogleNetHttpTransport.newTrustedTransport();
               JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
                  Dataflow client = new Dataflow.Builder(httpTransport, jsonFactory, credential)
                          .setApplicationName("Google Cloud Platform Sample 1")
                          .build(); 
               Job content = new Job();
               content.setProjectId("sample-id");
               content.setId(pp.getJobId());
               content.setRequestedState("JOB_STATE_DRAINING");
               client.projects().jobs()
                       .update("sample-id", pp.getJobId(), content)
                       .execute();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }  catch (GeneralSecurityException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
           }

1 Ответ

0 голосов
/ 29 августа 2018

Трубопровод будет опорожняться до тех пор, пока в полетных данных будет .Он прекратит прием данных, и когда все данные будут обработаны, задание останавливается.

1 - откуда вы получаете данные 2 - все данные могут быть обработаны во время сна

...