Управление выполнением задания на основе исключений в простом чанке в Spring Batch - PullRequest
0 голосов
/ 13 сентября 2018

У меня простое задание на обработку чанка CSV.

Я хотел бы изменить поток выполнения, когда во время обработки есть определенный тип ошибки (например, неверная структура строки)

Во избежание появления ошибок я должен предоставить пользовательский exceptionHandler, который поглотит исключение при разборе:

@Bean
fun processCsvStep(
    stepBuilderFactory: StepBuilderFactory,
    reader: ItemReader<InputRow>,
    processor: ItemProcessor<InputRow, OutputObject>,
    writer: ItemWriter<OutputObject>
) = stepBuilderFactory.get(PROCESS_CSV_STEP)
    .chunk<InputRow, OutputObject>(
        CHUNKS_NUMBER
    )
    .reader(reader)
    .processor(processor)
    .writer(writer)
    .exceptionHandler { context: RepeatContext, throwable: Throwable ->
        context.setTerminateOnly()
        logger.error { "Exception during parsing: ${throwable.message}" }
    }
    .build()!!

Тогда в моей работе я могу рассчитывать только на счет отката:

@Bean
fun createCsvJob(jobs: JobBuilderFactory, processCsvStep: Step, moveCsvStep: Step, moveFailedCsvStep: Step) = jobs.get(PROCESS_CSV_JOB)
    .start(processCsvStep)
    .next { jobExecution: JobExecution, stepExecution: StepExecution ->
        return@next when (stepExecution.rollbackCount) {
            0 -> FlowExecutionStatus.COMPLETED
            else -> FlowExecutionStatus.FAILED
        }

    }
    .on(FlowExecutionStatus.FAILED.name)
    .to(moveFailedCsvStep)
    .on(FlowExecutionStatus.COMPLETED.name)
    .to(moveCsvStep)
    .end()
    .build()!!

Есть ли способ передать информацию из обработчика исключений в JobExecutionDecider? Я хотел бы принять решение об исполнении на основе типа исключения, которое произошло во время синтаксического анализа. Это возможно?

1 Ответ

0 голосов
/ 13 сентября 2018

Я хотел бы принять решение об исполнении на основе типа исключения, возникшего во время синтаксического анализа.Возможно ли это?

Вы можете получить доступ к исключению, которое произошло во время шага, от решающего лица до stepExecution#getFailureExceptions.Вот пример:

import java.util.Arrays;
import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public ItemReader<Integer> itemReader() {
        return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
        return items -> {
            for (Integer item : items) {
                if (items.contains(3)) {
                    throw new IllegalArgumentException("no 3!");
                }
                System.out.println("item = " + item);
            }
        };
    }

    @Bean
    public Step step1() {
        return steps.get("step1")
                .<Integer, Integer>chunk(5)
                .reader(itemReader())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public Step step2() {
        return steps.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step2");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step step3() {
        return steps.get("step3")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step3");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public JobExecutionDecider decider() {
        return (jobExecution, stepExecution) -> {
            int rollbackCount = stepExecution.getRollbackCount();
            List<Throwable> failureExceptions = stepExecution.getFailureExceptions();
            System.out.println("rollbackCount = " + rollbackCount);
            System.out.println("failureExceptions = " + failureExceptions);
            // make the decision based on rollbackCount and/or failureExceptions and return status accordingly
            return FlowExecutionStatus.COMPLETED;
        };
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(step1())
                .on("*").to(decider())
                .from(decider()).on("COMPLETED").to(step2())
                .from(decider()).on("FAILED").to(step3())
                .build()
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

В этом примере, если исключение возникает во время step1, решающий может получить его из выполнения шага и принять соответствующее решение (перейдите к step2 или step3).

Так что я не уверен, что вам действительно нужен обработчик исключений и способ передачи информации решающему.То же самое относится и к тому, что вы хотите принять решение на основе rollbackCount, commitCount, readCount или любого другого показателя.

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...