Я пытаюсь выполнить сложный поток работ, выполненных в Spring Batch, используя комбинацию многопоточных шагов и параллельных заданий.
Сейчас я создал 3 задания (1, 2, 3), первое из которых (1) выполняется раньше остальных (как и ожидалось) и завершает без проблем. Два других (2,3) должны работать параллельно, имея несколько собственных параллельных шагов. Все эти задания, которые я пытаюсь запустить, инкапсулируются в JobSteps, а затем выполняются в основном задании (0).
Проблема возникает только на заданиях 2 и 3, где происходит сбой некоторого JobStep, не всегда в одной и той же точке, не всегда в одном и том же JobStep. Это трассировка стека такого исключения:
2019-01-07 17:35:57,513 ERROR: o.s.b.c.s.AbstractStep [SimpleAsyncTaskExecutor-10] Encountered an error executing step 2 in job0
org.springframework.dao.DataAccessResourceFailureException: Could not increment identity; nested exception is java.sql.SQLTransactionRollbackException: transaction rollback: serialization failure at org.springframework.jdbc.support.incrementer.AbstractIdentityColumnMaxValueIncrementer.getNextKey(AbstractIdentityColumnMaxValueIncrementer.java:113) ~[spring-jdbc-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.jdbc.support.incrementer.AbstractDataFieldMaxValueIncrementer.nextLongValue(AbstractDataFieldMaxValueIncrementer.java:128) ~[spring-jdbc-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.batch.core.repository.dao.JdbcJobExecutionDao.saveJobExecution(JdbcJobExecutionDao.java:151) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:145) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_60]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_60]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_60]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_60]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:282) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:172) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at com.sun.proxy.$Proxy111.createJobExecution(Unknown Source) ~[?:?]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:125) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_60]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_60]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_60]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_60]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at com.sun.proxy.$Proxy181.run(Unknown Source) ~[?:?]
at org.springframework.batch.core.step.job.JobStep.doExecute(JobStep.java:117) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:64) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:93) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:90) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_60]
at org.springframework.core.task.SimpleAsyncTaskExecutor$ConcurrencyThrottlingRunnable.run(SimpleAsyncTaskExecutor.java:271) [spring-core-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_60]
Caused by: java.sql.SQLTransactionRollbackException: transaction rollback: serialization failure at org.hsqldb.jdbc.JDBCUtil.sqlException(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.jdbc.JDBCUtil.sqlException(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.jdbc.JDBCStatement.fetchResult(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.jdbc.JDBCStatement.executeUpdate(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.springframework.jdbc.support.incrementer.AbstractIdentityColumnMaxValueIncrementer.getNextKey(AbstractIdentityColumnMaxValueIncrementer.java:110) ~[spring-jdbc-4.3.13.RELEASE.jar:4.3.13.RELEASE]
... 42 more
Caused by: org.hsqldb.HsqlException: transaction rollback: serialization failure at org.hsqldb.error.Error.error(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.error.Error.error(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.Session.handleAbortTransaction(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.Session.executeCompiledStatement(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.Session.executeDirectStatement(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.Session.execute(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.jdbc.JDBCStatement.fetchResult(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.jdbc.JDBCStatement.executeUpdate(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.springframework.jdbc.support.incrementer.AbstractIdentityColumnMaxValueIncrementer.getNextKey(AbstractIdentityColumnMaxValueIncrementer.java:110) ~[spring-jdbc-4.3.13.RELEASE.jar:4.3.13.RELEASE]
... 42 more
Я провел небольшое исследование, и этот тип ошибки обычно появляется, когда HSQLDB, который я использую для хранения информации о задании, не настроен для параллелизма. Однако я уже использую, казалось бы, хороший конфиг с режимом транзакции MVCC:
@Configuration
public class HSqlDbConfig {
@Primary
@Bean("hsqldbDataSource")
public DataSource hsqldbDataSource() {
final SimpleDriverDataSource dataSource = new SimpleDriverDataSource();
dataSource.setDriver(new org.hsqldb.jdbcDriver());
dataSource.setUrl("jdbc:hsqldb:mem:mydb;sql.enforce_strict_size=true;hsqldb.tx=mvcc");
dataSource.setUsername("sa");
dataSource.setPassword("");
return dataSource;
}
}
Это мои фрагменты кода, которые я использую для настройки этих заданий. Начиная с основной работы (0):
@Bean
public Job job0(JobBuilderFactory jobBuilderFactory) {
getJobParameters();
jobLauncher = (JobLauncher) ctx.getBean("jobLauncher");
Flow job1 = getJob1();
Flow job2 = getJob2();
Flow job3 = getJob3();
Flow splitFlow = getSplitFlow(job2, job3);
return jobBuilderFactory.get("Master Job")
.incrementer(new RunIdIncrementer())
.start(job1)
.next(splitFlow)
.end()
.build();
}
Как получить поток для заданий 2 и 3:
private Flow getJob2() {
Job j2 = (Job) ctx.getBean("job2");
DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();
Step step0 = getJobStep(j2, extractor);
return new FlowBuilder<Flow>("job2")
.start(step0)
.build();
}
private Flow getJob3() {
Job j3 = (Job) ctx.getBean("job3");
Job j3k = (Job) ctx.getBean("job3K");
Job j3l = (Job) ctx.getBean("job3L");
DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();
Step step0 = getJobStep(j3, extractor);
Step step1 = getJobStep(j3k, params1);
Step step2 = getJobStep(j3k, params2);
Step step3 = getJobStep(j3l, params3);
Step step4 = getJobStep(j3l, params4);
Flow flow1 = new FlowBuilder<Flow>("flowJ3f1")
.start(step1)
.next(step2)
.build();
Flow flow2 = new FlowBuilder<Flow>("flowJ3f2")
.start(step3)
.next(step4)
.build();
return new FlowBuilder<Flow>("job3")
.start(step0)
.split(taskExecutor)
.add(flow1, flow2)
.build();
}
Оба метода getJobStep ():
private Step getJobStep(Job job, JobParametersExtractor extractor) {
return steps.get(job.getName())
.job(job)
.launcher(jobLauncher)
.parametersExtractor(extractor)
.build();
}
private Step getJobStep(Job job, JobParameters jobParameters) {
SimpleJobParametersExtractor extractor = new SimpleJobParametersExtractor();
extractor.setJobParameters(jobParameters);
return steps.get(job.getName())
.job(job)
.launcher(jobLauncher)
.parametersExtractor(extractor)
.build();
}
Идея состоит в том, чтобы заставить эту структуру работать, поскольку распараллеливание всех возможных задач является требованием для этого проекта, и оно должно быть достаточно надежным, чтобы добавлять другие параллельные задания, кроме 2 и 3. Кроме того, все задания и этапы имеют были протестированы без параллелизма, и они работают как задумано.
Я могу предоставить больше кода, если это необходимо. Прямо сейчас я нахожусь в тупике, поэтому мы ценим каждую помощь.
РЕДАКТИРОВАТЬ: Как предложил @MahmoudBenHassine, я настроил JobRepository, TransactionManager и JobLauncher (это решило мою первую проблему) следующим образом:
@Bean(name = "myTransactionManager")
public DataSourceTransactionManager transactionManager(@Qualifier("hsqldbDataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean(name = "myJobRepository")
public JobRepository jobRepository(@Qualifier("hsqldbDataSource") DataSource dataSource,
@Qualifier("myTransactionManager") DataSourceTransactionManager dataSourceTransactionManager) throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(dataSourceTransactionManager);
factory.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED");
factory.afterPropertiesSet();
return factory.getObject();
}
@Bean(name = "myJobLauncher")
public JobLauncher getJobLauncher(@Qualifier("myJobRepository") JobRepository jobRepository) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
Затем я запускаю задание 0 с новым JobLauncher, и это исключение возникает на JobSteps из job3 (3K & 3L) EDIT : у меня есть большая выдержка журнала для большего контекста происходящего:
2019-01-09 09:53:39,793 INFO: o.s.b.c.j.SimpleStepHandler [MainTaskExecutor11] Executing step: [3K]
2019-01-09 09:53:39,811 INFO: o.s.b.c.l.s.SimpleJobLauncher [MainTaskExecutor11] Job: [FlowJob: [name=3K]] launched with the following parameters: [{process=Job 3K BAR, pos_cod=BAR, per_event=EVENT, isRet=false, UNIQUE=-2347943936040182027}]
2019-01-09 09:53:39,821 INFO: o.s.b.c.j.SimpleStepHandler [MainTaskExecutor11] Executing step: [[3K] Job 3K]
2019-01-09 09:53:39,822 INFO: e.i.l.d.l.StepListener [MainTaskExecutor11] Executing Step: [3K] Job 3K
2019-01-09 09:53:41,120 INFO: e.i.l.d.l.StepListener [MainTaskExecutor11] Write Count: 0
2019-01-09 09:53:41,166 INFO: o.s.b.c.l.s.SimpleJobLauncher [MainTaskExecutor11] Job: [FlowJob: [name=3K]] completed with the following parameters: [{process=Job 3K BAR, pos_cod=BAR, per_event=EVENT, isRet=false, UNIQUE=-2347943936040182027}] and the following status: [COMPLETED]
2019-01-09 09:53:41,189 INFO: o.s.b.c.j.SimpleStepHandler [MainTaskExecutor11] Duplicate step [3K] detected in execution of job=[job 0]. If either step fails, both will be executed again on restart.
2019-01-09 09:53:41,191 INFO: o.s.b.c.j.SimpleStepHandler [MainTaskExecutor11] Executing step: [3K]
2019-01-09 09:53:41,201 ERROR: o.s.b.c.s.AbstractStep [MainTaskExecutor11] Encountered an error executing step 3K in job 0
org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={process=Job 3K BAR, pos_cod=BAR, per_event=EVENT, isRet=false, UNIQUE=-2347943936040182027}. If you want to run this job again, change the parameters.
at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:126) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_60]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_60]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_60]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_60]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:282) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:172) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at com.sun.proxy.$Proxy111.createJobExecution(Unknown Source) ~[?:?]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:125) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.step.job.JobStep.doExecute(JobStep.java:117) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:64) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:93) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:90) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_60]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_60]
Обратите внимание, что я использую различные параметры JobParameters в getJob3()
для каждого выполнения задания 3K и 3L (одно и то же исключение выдается и для этого задания). Я использую следующие JobParameters: EDIT : я включил функцию Random для включения уникального JobParameter:
Random randomizer = new Random(System.currentTimeMillis());
params1 = new JobParametersBuilder()
.addString(PROCESS, "Job 3K BAR" )
.addString("pos_cod", "BAR")
.addString("per_event", "EVENT")
.addString(IS_RET, FALSE)
.addLong("UNIQUE", randomizer.nextLong())
.toJobParameters();
params2 = new JobParametersBuilder()
.addString(PROCESS, "Job 3K 704" )
.addString("pos_cod", "704")
.addString("per_event", "EVENT")
.addString(IS_RET, FALSE)
.addLong("UNIQUE", randomizer.nextLong())
.toJobParameters();
params3 = new JobParametersBuilder()
.addString(PROCESS, "Job 3L BAR" )
.addString("pos_cod", "BAR")
.addString("per_event", "RET_EVENT")
.addString(IS_RET, FALSE)
.addLong("UNIQUE", randomizer.nextLong())
.toJobParameters();
params4 = new JobParametersBuilder()
.addString(PROCESS, "Job 3L 704" )
.addString("pos_cod", "704")
.addString("per_event", "RET_EVENT")
.addString(IS_RET, FALSE)
.addLong("UNIQUE", randomizer.nextLong())
.toJobParameters();
Видя, что повторяющаяся ошибка экземпляра задания продолжает возникать, когда JobLauncher пытается запустить задание samen (не его одноуровневое сопоставление с другими JobParameters), я более склонен думать, что это проблема с моим JobRepository, но это ничего больше, чем спекуляция.