Как программно убить задание Beam Dataflow от рабочих - PullRequest
0 голосов
/ 18 марта 2020

Я хочу завершить работу Apache Beam программно с рабочих узлов, используя Java Beam SDK. В идеале я хотел бы, чтобы это решение было независимым от бегуна, но подойдет даже решение, определяемое DataflowRunner c.

Я не хочу использовать ловушки отключения, я ищу что-то, что поддерживается лучом API.

Самое близкое, что я нашел, к тому, что я хочу, это org.apache.beam.runners.dataflow.util.MonitoringUtil::getGcloudCancelCommand. Однако это просто возвращает String с командой, которую необходимо выполнить для отмены задания. Это не отменяет работу из JVM.

Ответы [ 2 ]

1 голос
/ 18 марта 2020

Есть две возможные команды, которые можно использовать для остановки задания потока данных: Cancel и Drain. Это можно сделать, введя команду с помощью интерфейса мониторинга потока данных или интерфейса командной строки потока данных. Пожалуйста, обратитесь к официальной документации .

Кроме того, вы можете проверить API обновления REST projects.locations.jobs.update, чтобы обновить состояние существующего задания потока данных.

Использовать метод Rest Update, с этим телом, подробнее см. В Руководство разработчика Google :

{ "requestedState": "JOB_STATE_DRAINING" }

Кроме того, я настоятельно рекомендую вам поискать этот поток Stackoverflow . Надеюсь, это поможет.

0 голосов
/ 26 марта 2020

Завершение (истощение) задания из DoFn.ProcessElement:

import java.io.IOException;

import com.google.api.services.dataflow.model.Job;

import org.apache.beam.runners.dataflow.DataflowClient;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;

class DrainDoFn<S, T> extends DoFn<S, T> {
    private static final long serialVersionUID = 1L;

    @ProcessElement
    public void processElement(ProcessContext pc) throws IOException {
        PipelineOptions options = pc.getPipelineOptions();
        DataflowPipelineOptions dpOptions =
            options.as(DataflowPipelineOptions.class);

        DataflowWorkerHarnessOptions dwhOptions =
            options.as(DataflowWorkerHarnessOptions.class);

        String jobId = dwhOptions.getJobId();
        DataflowClient dataflowClient = DataflowClient.create(dpOptions);
        Job jobDescription = dataflowClient.getJob(jobId);
        jobDescription.setRequestedState("JOB_STATE_DRAINING");
        dataflowClient.updateJob(jobId, jobDescription);
    }
}
...