Хорошо, я уверен, что кто-то еще столкнулся с этим и решил это, но кажется, что это вне моего понимания Spring. У меня есть приложение Spring Boot (2.2.4), которое я использую Spring Batch для чтения набора записей из текстового файла и импорта их в базу данных MySQL. Это работало просто отлично, но теперь у меня есть требование сделать приложение мультитенантным. Я смог сделать приложение Spring Boot мультитенантным, передав заголовок (X-TenantID), но теперь Spring Batch больше не работает. Почему? Вот ошибка, которую я получаю. org.springframework.dao.EmptyResultDataAccessException: Incorrect result size: expected 1, actual 0
Полное исключение:
org.springframework.dao.EmptyResultDataAccessException: Incorrect result size: expected 1, actual 0
at org.springframework.dao.support.DataAccessUtils.nullableSingleResult(DataAccessUtils.java:97) ~[spring-tx-5.2.3.RELEASE.jar:5.2.3.RELEASE]
DataAccessUtils.java:97
at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:784) ~[spring-jdbc-5.2.3.RELEASE.jar:5.2.3.RELEASE]
JdbcTemplate.java:784
at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:809) ~[spring-jdbc-5.2.3.RELEASE.jar:5.2.3.RELEASE]
JdbcTemplate.java:809
at org.springframework.batch.core.repository.dao.JdbcJobExecutionDao.synchronizeStatus(JdbcJobExecutionDao.java:308) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
JdbcJobExecutionDao.java:308
at org.springframework.batch.core.repository.support.SimpleJobRepository.update(SimpleJobRepository.java:166) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
SimpleJobRepository.java:166
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_212]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_212]
NativeMethodAccessorImpl.java:62
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_212]
DelegatingMethodAccessorImpl.java:43
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_212]
Method.java:498
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
AopUtils.java:344
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
ReflectiveMethodInvocation.java:198
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
ReflectiveMethodInvocation.java:163
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:366) ~[spring-tx-5.2.3.RELEASE.jar:5.2.3.RELEASE]
TransactionAspectSupport.java:366
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:99) ~[spring-tx-5.2.3.RELEASE.jar:5.2.3.RELEASE]
TransactionInterceptor.java:99
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
ReflectiveMethodInvocation.java:186
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
JdkDynamicAopProxy.java:212
at com.sun.proxy.$Proxy182.update(Unknown Source) ~[na:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_212]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_212]
NativeMethodAccessorImpl.java:62
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_212]
DelegatingMethodAccessorImpl.java:43
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_212]
Method.java:498
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
AopUtils.java:344
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
ReflectiveMethodInvocation.java:198
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
ReflectiveMethodInvocation.java:163
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
SimpleBatchConfiguration.java:127
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
ReflectiveMethodInvocation.java:186
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
JdkDynamicAopProxy.java:212
at com.sun.proxy.$Proxy215.update(Unknown Source) ~[na:na]
at org.springframework.batch.core.job.AbstractJob.updateStatus(AbstractJob.java:440) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
AbstractJob.java:440
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:314) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
SimpleJobLauncher.java:147
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
Thread.java:748
редактировать: я устанавливаю идентификатор арендатора с помощью HandlerInterceptorAdapter ...
public class TenantInterceptor extends HandlerInterceptorAdapter {
private String defaultHeader = "X-TENANT-ID";
private String defaultTenantId = "default";
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
log.debug("TenantInterceptor::preHandle");
log.debug("Default Header: " + defaultHeader);
log.debug("Default Tenant ID: " + defaultTenantId);
try {
String tenantId = defaultTenantId;
if (defaultHeader.equalsIgnoreCase("Authorization")) {
log.debug("Using the 'Authorization' header so we need to get the value from the JWT");
String authToken = request.getHeader(defaultHeader);
try {
tenantId = JwtUtil.getClaim(JwtUtil.getTokenFromBearer(authToken).get(), "tenantId");
} catch (Exception ex) {
ex.printStackTrace();
}
} else if (defaultHeader.equalsIgnoreCase("X-TENANT-ID")) {
log.debug("Using the 'X-TENANT-ID' header so just get the value from it.");
tenantId = request.getHeader(defaultHeader);
log.debug("TenantID: " + tenantId);
if (tenantId == null || tenantId.isEmpty()) {
tenantId = defaultTenantId;
}
}
// TODO: Security checks
// We need to ensure that the user making the request has the permissions
// to make a request for the this tenant (Client/Customer)!
TenantContext.setCurrentTenant(tenantId);
} catch (Exception ex) {
return false;
}
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
ModelAndView modelAndView) throws Exception {
TenantContext.clear();
}
}
TenantContext
public class TenantContext {
private static ThreadLocal<String> currentTenant = new ThreadLocal<>();
public static String getCurrentTenant() {
return currentTenant.get();
}
public static void setCurrentTenant(String tenant) {
log.debug("Setting tenant to " + tenant);
currentTenant.set(tenant);
}
public static void clear() {
currentTenant.set(null);
}
}
И my BatchConfig
public class BatchConfig {
@Autowired
JobRepository jobRepository;
@Autowired
EntityManagerFactory emf;
@Autowired
WheelItemDao wheelItemDao;
@Autowired
DuplicateJurorDao dupJurorDao;
@Autowired
JurorDao jurorDao;
@Autowired
EventDao eventDao;
// @Autowired
// DataSource dataSource;
@Bean
public BatchConfigurer batchConfigurer(EntityManagerFactory emf) {
return new DefaultBatchConfigurer() {
@Override
public PlatformTransactionManager getTransactionManager() {
return new JpaTransactionManager(emf);
}
};
}
@Bean
public Job wheelImportJob(JobBuilderFactory jobs, StepBuilderFactory steps,
ItemProcessor<WheelItem, Juror> jurorProcessor, ItemProcessor<WheelItem, WheelItem> wheelProcessor,
JobExecutionListener listener) {
// @formatter:off
Step wheelImport = steps.get("wheelImport")
.<WheelItem, WheelItem>chunk(100)
.reader(reader(null))
.processor(wheelProcessor)
.writer(wheelWriter())
.build();
Step jurorImport = steps.get("jurorImport")
.<WheelItem, Juror>chunk(1)
.reader(wheelItemReader())
.processor(jurorProcessor)
.writer(writer())
.taskExecutor(taskExecutor())
.throttleLimit(125)
.build();
return jobs.get("wheelImportJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(wheelImport)
.next(jurorImport)
.build();
// @formatter:on
}
@Bean
public JpaItemWriter<Juror> writer() {
JpaItemWriter<Juror> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(emf);
return writer;
}
@Bean
public JpaItemWriter<WheelItem> wheelWriter() {
JpaItemWriter<WheelItem> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(emf);
return writer;
}
@Bean
public ItemReader<WheelItem> wheelItemReader() {
JpaPagingItemReader<WheelItem> reader = new JpaPagingItemReader<>();
reader.setEntityManagerFactory(emf);
reader.setQueryString("select w from WheelItem w");
reader.setPageSize(5);
try {
reader.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return reader;
}
@Bean
public JobExecutionListener listener() {
return new JobExecutionListener() {
@Override
public void beforeJob(JobExecution jobExecution) {
// @formatter:off
eventDao.save(
Event.builder()
.active(true)
.eventType(EventType.RECONTITUTE_WHEEL_STARTED.id)
.note(EventType.RECONTITUTE_WHEEL_STARTED.label)
.createdDt(LocalDateTime.now())
.updatedDt(LocalDateTime.now())
.build());
// @formatter:on
wheelItemDao.truncateTable();
dupJurorDao.truncateTable();
// De-activate all current Jurors.
jurorDao.updateActive(false);
JurorProcessor.counter = 0;
}
@Override
public void afterJob(JobExecution jobExecution) {
// @formatter:off
eventDao.save(
Event.builder()
.active(true)
.eventType(EventType.RECONTITUTE_WHEEL_FINISHED.getId())
.note(EventType.RECONTITUTE_WHEEL_FINISHED.getLabel())
.createdDt(LocalDateTime.now())
.updatedDt(LocalDateTime.now())
.build());
// @formatter:on
}
};
}
@Bean
public ChunkExecutionListener chunkListener() {
return new ChunkExecutionListener();
}
@Bean
@StepScope
public synchronized FlatFileItemReader<WheelItem> reader(
@Value("#{jobParameters[fullPathFileName]}") String pathToFile) {
FlatFileItemReader<WheelItem> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setResource(new FileSystemResource(pathToFile));
flatFileItemReader.setName("Wheel-Reader");
flatFileItemReader.setLineMapper(lineMapper());
return flatFileItemReader;
}
@Bean
public LineMapper<WheelItem> lineMapper() {
DefaultLineMapper<WheelItem> defaultLineMapper = new DefaultLineMapper<>();
FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
tokenizer.setStrict(false);
// @formatter:off
tokenizer.setNames(
...
);
tokenizer.setColumns(
...
);
// @formatter:on
defaultLineMapper.setLineTokenizer(tokenizer);
BeanWrapperFieldSetMapper<WheelItem> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(WheelItem.class);
defaultLineMapper.setFieldSetMapper(fieldSetMapper);
return defaultLineMapper;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(100);
taskExecutor.setMaxPoolSize(150);
taskExecutor.initialize();
return taskExecutor;
}
@Bean(name = "myJobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}
Пакетное задание запускается через HTTP-вызов в мой RestController.
JobExecution jobExec = jobLauncher.run(
job,
new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString("fullPathFileName", UPLOADED_FOLDER + UPLOADED_FILE).toJobParameters());