Условный поток Spring Batch, вызывающий сбой состояния выхода задания - PullRequest
0 голосов
/ 01 марта 2019

У меня проблемы с подпружиненным потоком, который выполняет следующее (упрощенно)

  1. выполняет шаг (всегда)
  2. во время выполнения, решите, что делать дальше (jobexecutiondecider)
  3. если решающий элемент решает «продолжить» -> выполнить более сложный внутренний поток
  4. если решающий решает «завершить» -> не выполнять внутренний поток, а просто завершить задание

С учетом следующего MVE:

@Configuration
@EnableBatchProcessing
public class BatchExample {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job exampleJob() {
        final FlowBuilder<Flow> innerFlowBuilder = new FlowBuilder<>("innerFlow");
        final Flow innerFlow = innerFlowBuilder
            .start(dummyStep("first innerflow step"))
            .next(dummyStep("second innerflow step"))
            .next(dummyStep("last innerflow step"))
            .build();

        final FlowBuilder<Flow> outerFlowBuilder = new FlowBuilder<>("outerFlow");
        final Flow outerFlow = outerFlowBuilder
            .start(dummyStep("always execute me"))
            .next(decide()).on("CONTINUE").to(innerFlow)
            .from(decide()).on("COMPLETED").end("COMPLETED")
            .build();

        return jobBuilderFactory.get("exampleJob")
            .start(outerFlow)
            .end()
            .build();
    }

    private Step dummyStep(String arg) {
        return stepBuilderFactory.get("step_" + arg)
            .tasklet(dummyTasklet(arg))
            .build();
    }

    private Tasklet dummyTasklet(String arg) {
        return new DummyTasklet(arg);
    }

    private DummyDecider decide() {
        return new DummyDecider();
    }

    class DummyTasklet implements Tasklet {

        private final String arg;

        DummyTasklet(String arg) {
            this.arg = arg;
        }

        @Override
        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
            System.out.println("hello from dummy tasklet: " + arg);
            return RepeatStatus.FINISHED;
        }
    }

    class DummyDecider implements JobExecutionDecider {

        @Override
        public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
            final Random random = new Random();
            final int i = random.nextInt();

            if (i % 2 == 0) {
                return new FlowExecutionStatus("CONTINUE");
            } else {
                return FlowExecutionStatus.COMPLETED;
            }
        }
    }    
}

При выполнении кода поток «продолжить» идет хорошо, но «завершенный» поток всегда завершается с состоянием невыполненного задания.

 Job: [FlowJob: [name=exampleJob]] completed with the following
 parameters: [{}] and the following status: [FAILED]

Как мне завершить работу со статусом ЗАВЕРШЕНО?Другими словами, что я сделал не так с кодировкой потока?

Код можно запустить с помощью приложения с загрузочной пружиной

@SpringBootApplication
public class FlowApplication implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(FlowApplication.class, args);
    }

    @Autowired
    private Job exampleJob;
    @Autowired
    private JobLauncher jobLauncher;


    @Override
    public void run(ApplicationArguments args) throws Exception {
        jobLauncher.run(exampleJob, new JobParameters());
    }
}

Редактировать

При реализации предложенного ответа:

final Flow outerFlow = outerFlowBuilder
            .start(dummyStep("always execute me"))
            .on("*").to(decide())
            .from(decide()).on("CONTINUE").to(innerFlow)
            .from(decide()).on("COMPLETED").end("COMPLETED")
            .build();

И принудительное принятие решения "ПРОДОЛЖИТЬ":

class DummyDecider implements JobExecutionDecider {
        @Override
        public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
            return new FlowExecutionStatus("CONTINUE");
        }
    }

внутренний поток больше не запускается:

Executing step: [step_always execute me]
hello from dummy tasklet: always execute me
Job: [FlowJob: [name=exampleJob]] completed with the following parameters: [{}] and the following status: [FAILED]
Job: [FlowJob: [name=exampleJob]] launched with the following parameters: [{}]
Step already complete or not restartable, so no action to execute: StepExecution: id=0, version=3, name=step_always execute me, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=
Job: [FlowJob: [name=exampleJob]] completed with the following parameters: [{}] and the following status: [FAILED]

1 Ответ

0 голосов
/ 01 марта 2019

Ваш outerFlow должен быть определен следующим образом:

final Flow outerFlow = outerFlowBuilder
        .start(dummyStep("always execute me"))
        .on("*").to(decide())
        .from(decide()).on("CONTINUE").to(innerFlow)
        .from(decide()).on("COMPLETED").end("COMPLETED")
        .build();

Я проверил это на вашем примере (спасибо за MVE!), И задание не завершилось неудачей, когда решающее значение вернуло COMPLETED.

Причина в том, что без направления всех результатов шага dummyStep("always execute me") на решающее устройство с помощью .on("*").to(decide()) определение потока неверно.

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...