Я выполняю пакетное задание, которое считывает данные из таблиц Employee
и Payments
, обновляет как Payments
, так и Employee
в процессоре и сохраняет их.
Когда я запускаю это с одним потоком, тогда он работает нормально, но с многопоточными шагами он выдает исключение из метода закрытия в считывателе.
Моя пакетная конфигурация:
@Configuration
public class EmployeeBatchConfig {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private EntityManagerFactory entityManagerFactory;
@Autowired
private EmployeeProcessor employeeProcessor;
@Bean
@StepScope
public JpaPagingItemReader<Employee> employeePaymentItemReader() {
return new JpaPagingItemReaderBuilder<Employee>().pageSize(100)
.name("employeePaymentReader")
.saveState(false)
.entityManagerFactory(entityManagerFactory)
.queryString("select distinct e from Employee e JOIN e.payments ep where " +
"ep.processStatus = com.empcomp.empcomp.model.enumeration.ProcessingStatus.QFI" +
" OR ep.processStatus = com.empcomp.empcomp.model.enumeration.ProcessingStatus.PFR")
.build();
}
private JpaItemWriter<Employee> employeePaymentJpaItemWriter() {
return new JpaItemWriterBuilder().entityManagerFactory(entityManagerFactory).build();
}
@Bean()
public Step employeePaymentStep() {
return stepBuilderFactory.get("Employee Payment Processing Step")
.<Employee, Employee>chunk(1)
.reader(employeePaymentItemReader())
.processor(employeeProcessor)
.writer(employeePaymentJpaItemWriter())
.taskExecutor(taskExecutor()).throttleLimit(5)
.build();
}
@Bean
public Job employeePaymentJob() {
return jobBuilderFactory.get("Employee Payment Processing Step")
.incrementer(new RunIdIncrementer())
.start(employeePaymentStep())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
}
Журналы ошибок:
org.springframework.batch.item.ItemStreamException: Error while closing item reader
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.close(AbstractItemCountingItemStreamItemReader.java:142) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.batch.item.database.JpaPagingItemReader$$EnhancerBySpringCGLIB$$7db2e151.close(<generated>) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.item.support.CompositeItemStream.close(CompositeItemStream.java:89) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.close(TaskletStep.java:306) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:286) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at com.sun.proxy.$Proxy100.run(Unknown Source) ~[na:na]
at com.empcomp.empcomp.service.PaymentService.runPaymentJob(PaymentService.java:34) ~[classes/:na]
at com.empcomp.empcomp.controller.JobController.profileJobRun(JobController.java:19) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) ~[spring-web-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) ~[spring-web-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105) ~[spring-webmvc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:879) ~[spring-webmvc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:793) ~[spring-webmvc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) ~[spring-webmvc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) ~[spring-webmvc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:634) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.34.jar:9.0.34]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:373) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-9.0.34.jar:9.0.34]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.lang.NullPointerException: null
at org.hibernate.engine.internal.StatefulPersistenceContext.clear(StatefulPersistenceContext.java:236) ~[hibernate-core-5.4.15.Final.jar:5.4.15.Final]
at org.hibernate.internal.SessionImpl.cleanupOnClose(SessionImpl.java:521) ~[hibernate-core-5.4.15.Final.jar:5.4.15.Final]
at org.hibernate.internal.AbstractSharedSessionContract.setClosed(AbstractSharedSessionContract.java:353) ~[hibernate-core-5.4.15.Final.jar:5.4.15.Final]
at org.hibernate.internal.AbstractSharedSessionContract.close(AbstractSharedSessionContract.java:346) ~[hibernate-core-5.4.15.Final.jar:5.4.15.Final]
at org.hibernate.internal.SessionImpl.closeWithoutOpenChecks(SessionImpl.java:366) ~[hibernate-core-5.4.15.Final.jar:5.4.15.Final]
at org.hibernate.internal.SessionImpl.close(SessionImpl.java:351) ~[hibernate-core-5.4.15.Final.jar:5.4.15.Final]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerInvocationHandler.invoke(ExtendedEntityManagerCreator.java:366) ~[spring-orm-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at com.sun.proxy.$Proxy94.close(Unknown Source) ~[na:na]
at org.springframework.batch.item.database.JpaPagingItemReader.doClose(JpaPagingItemReader.java:232) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.close(AbstractItemCountingItemStreamItemReader.java:139) ~[spring-batch-infrastructure-4.2.2.RELEASE.jar:4.2.2.RELEASE]
Пожалуйста, предложите, если мне не хватает какой-либо конфигурации для многопоточных шагов.