Как запустить Spring Batch Job асинхронно - PullRequest
0 голосов
/ 09 декабря 2018

Я следовал за пружинным пакетным документом и не смог запустить свою работу асинхронно.

Итак, я запускаю задание из веб-контейнера, и задание будет запущено черезКонечная точка REST.

Я хотел получить идентификатор JobInstance , чтобы передать его в ответ перед завершением всей работы.Таким образом, они могут проверить статус задания позже с идентификатором JobInstance вместо ожидания.Но я не мог заставить это работать.Ниже приведен пример кода, который я пробовал.Пожалуйста, дайте мне знать, что я пропустил или неправильно.

BatchConfig для создания Async JobLauncher

@Configuration
public class BatchConfig {

    @Autowired
    JobRepository jobRepository;


    @Bean
    public JobLauncher simpleJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }
}

Контроллер

@Autowired
JobLauncher jobLauncher;

@RequestMapping(value="/trigger-job", method = RequestMethod.GET)
public Long workHard() throws Exception {
    JobParameters jobParameters = new JobParametersBuilder().
            addLong("time", System.currentTimeMillis())
            .toJobParameters();
    JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);
    System.out.println(jobExecution.getJobInstance().getInstanceId());
    System.out.println("OK RESPONSE");
    return jobExecution.getJobInstance().getInstanceId();
}

И JobBuilder как компонент

@Component
public class BatchComponent {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    public Job customJob(String someParam) throws Exception {
        return jobBuilderFactory.get("personProcessor")
                .incrementer(new RunIdIncrementer()).listener(listener())
                .flow(personPorcessStep(someParam)).end().build();
    }


    private Step personPorcessStep(String someParam) throws Exception {
        return stepBuilderFactory.get("personProcessStep").<PersonInput, PersonOutput>chunk(1)
                .reader(new PersonReader(someParam)).faultTolerant().
                        skipPolicy(new DataDuplicateSkipper()).processor(new PersonProcessor())
                .writer(new PersonWriter()).build();
    }


    private JobExecutionListener listener() {
        return new PersonJobCompletionListener();
    }

    private class PersonInput {
        String firstName;

        public PersonInput(String firstName) {
            this.firstName = firstName;
        }

        public String getFirstName() {
            return firstName;
        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    }

    private class PersonOutput {
        String firstName;

        public String getFirstName() {
            return firstName;
        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    }

    public class PersonReader implements ItemReader<PersonInput> {
        private List<PersonInput> items;
        private int count = 0;

        public PersonReader(String someParam) throws InterruptedException {
            Thread.sleep(10000L); //to simulate processing
            //manipulate and provide data in the read method
            //just for testing i have given some dummy example
            items = new ArrayList<PersonInput>();
            PersonInput pi = new PersonInput("john");
            items.add(pi);
        }

        @Override
        public PersonInput read() {
            if (count < items.size()) {
                return items.get(count++);
            }
            return null;
        }
    }


    public class DataDuplicateSkipper implements SkipPolicy {

        @Override
        public boolean shouldSkip(Throwable exception, int skipCount) throws SkipLimitExceededException {
            if (exception instanceof DataIntegrityViolationException) {
                return true;
            }
            return true;
        }
    }


    private class PersonProcessor implements ItemProcessor<PersonInput, PersonOutput> {

        @Override
        public PersonOutput process(PersonInput item) throws Exception {
            return null;
        }
    }

    private class PersonWriter implements org.springframework.batch.item.ItemWriter<PersonOutput> {
        @Override
        public void write(List<? extends PersonOutput> results) throws Exception {
            return;
        }
    }

    private class PersonJobCompletionListener implements JobExecutionListener {
        public PersonJobCompletionListener() {
        }

        @Override
        public void beforeJob(JobExecution jobExecution) {

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
            System.out.println("JOB COMPLETED");
        }
    }
}

Основная функция

@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
@EnableAsync
public class SpringBatchTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchTestApplication.class, args);
    }
}

Я использую конфигурации на основе аннотаций и использую gradle с нижеуказанным пакетным пакетом.

compile('org.springframework.boot:spring-boot-starter-batch')

Пожалуйста, дайте мне знать, если потребуется дополнительная информация.Я не смог найти ни одного примера для запуска этого общего варианта использования.

Спасибо за ваше время.

Ответы [ 4 ]

0 голосов
/ 29 августа 2019

В соответствии с весенней документацией для возврата ответа HTTP-запроса асинхронно необходимо использовать org.springframework.core.task.SimpleAsyncTaskExecutor.

Любая реализация пружинного интерфейса TaskExecutor может использоваться для управления асинхронным выполнением заданий.

документация для пакетной пружины

<bean id="jobLauncher"
  class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor">
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
</property>

0 голосов
/ 05 марта 2019

Несмотря на то, что у вас есть пользовательский jobLauncher, вы запускаете задание, используя jobLauncher по умолчанию, предоставленный Spring.Не могли бы вы, пожалуйста, autowire simpleJobLauncher в вашем контроллере и попробовать?

0 голосов
/ 23 марта 2019

Попробуйте, в вашей конфигурации вам нужно создать customJobLauncher с SimpleAsyncTaskExecutor , используя @ Bean (name = "myJobLauncher") , и то же самое будет использоваться @ Qualifier в вашем контроллере.

@Bean(name = "myJobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

в вашем контроллере

@Autowired
@Qualifier("myJobLauncher")
private JobLauncher jobLauncher;
0 голосов
/ 09 декабря 2018

JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);.Joblauncher будет ждать после завершения задания, прежде чем что-либо возвращать, поэтому ваш сервис, вероятно, займет много времени, чтобы ответить, если это ваша проблема.Если вам нужны асинхронные возможности, вы можете взглянуть на Spring * @EnableAsync & @Async.

@ EnableAsync

...