Я использую пружинную партию с версией 4.2.1. ВЫПУСК. У меня есть одна работа, которую я запускаю несколько раз с разными параметрами работы. При выполнении синхронных заданий все работает нормально. Когда я пытаюсь выполнить задания асинхронно, я получаю исключение NullPointerException при попытке получить некоторое значение из контекста выполнения в CustomItemWriter. Задания планируются планировщиком заданий.
CustomItemWriter
public class CustomWriter ItemWriter<T>, StepExecutionListener {
private CustomObject customObject;
@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext();
customObject = (CustomObject) Objects.requireNonNull(executionContext.get("custom_object")); -> Exception when running asynchronous
}
@Override
public void write(List<? extends CustomWriter> list) {
// do something
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return ExitStatus.COMPLETED;
}
}
BatchConfig
public class BatchConfig extends DefaultBatchConfigurer {
@Autowired
@Qualifier("batchDataSource")
private DataSource batchDataSource;
@Autowired
private PlatformTransactionManager platformTransactionManager;
@Autowired
private JobRepository jobRepository;
@Override
@Autowired
public void setDataSource(@Qualifier("batchDataSource") final DataSource batchDataSource) {
super.setDataSource(batchDataSource);
}
@Bean
public BatchDataSourceInitializer batchDataSourceInitializer(
@Qualifier("batchDataSource") final DataSource batchDataSource, final ResourceLoader resourceLoader) {
return new BatchDataSourceInitializer(batchDataSource, resourceLoader, new BatchProperties());
}
@Override
public JobRepository createJobRepository() throws Exception {
Jackson2ExecutionContextStringSerializer defaultSerializer = new Jackson2ExecutionContextStringSerializer();
defaultSerializer.setObjectMapper(JsonUtils.getObjectMapper());
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(batchDataSource);
factory.setTransactionManager(platformTransactionManager);
factory.setSerializer(defaultSerializer);
factory.afterPropertiesSet();
return factory.getObject();
}
@Bean
public JobLauncher asyncJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}
MainClass
@EnableScheduling
@EnableAsync
@SpringBootApplication
@Import({BatchConfig.class, DataSourceConfig.class, MainJobConfig.class})
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
Исключение (возникает только в том случае, если выполнение заданий асинхронно)
java.lang.NullPointerException: null
at java.util.Objects.requireNonNull(Objects.java:203)
at com.aldisued.tcmapp.chunks.writer.CustomItemWriter.beforeStep(CustomItemWriter.java:22)
at org.springframework.batch.core.listener.CompositeStepExecutionListener.beforeStep(CompositeStepExecutionListener.java:79)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:204)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68)
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:137)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
at java.lang.Thread.run(Thread.java:748)
JobScheduler
@Component
public class JobScheduler {
@Autowired
private Environment environment;
@Autowired
@Qualifier("asyncJobLauncher")
private JobLauncher jobLauncher;
@Autowired
private Job job;
@Scheduled(cron = "${job.cron.expr}")
public void runJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException,
JobRestartException, JobInstanceAlreadyCompleteException, InterruptedException {
runJobs(getStringList();
}
private void runJobs(List<String> stringList) throws JobParametersInvalidException,
JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
for (String string : stringList) {
JobParameters params = new JobParametersBuilder()
.addDate("exec_time", new Date())
.addString("exec_time", string)
.toJobParameters();
jobLauncher.run(job, params);
}
}
private List<Strings> getStringList() {
// get some strings
}
}
Как сделать потокобезопасным выполнение или что я делаю неправильно?