Я добавляю слушателя к чанку, чтобы выполнить какую-то работу, и, если он потерпел неудачу, я ожидал, что вся работа не удалась, и откат вставил в базу данных в ItemWriter. Но это не откат. Я думаю, либо @BeforeChunk и @AfterChunk находятся вне чанка, поэтому они не находятся внутри одной транзакции, либо @AfterChunkError просто перехватывает ошибку, и мне следует распространить эту ошибку, чтобы сообщить заданию, что это ошибка, и откатить все.
Он развернут под Weblogic 12.2.1.3.0, и я использую Spring Boot 2.3.0. В Weblogic для источника данных я использую имя класса драйвера oracle.jdbc.replay.OracleXADataSourceImpl. Мой pom-файл выглядит так:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
...
<dependency>
<groupId>com.oracle.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>12.2.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
...
</project>
application.properties:
spring.batch.job.enabled=false
spring.datasource.jndi-name=jdbc/NameDS
spring.datasource.driver-class-name=oracle.jdbc.OracleDriver
spring.batch.initialize-schema=never
Моя конфигурация:
@Autowired
DataSource dataSource;
@Autowired
Processor processor;
@Autowired
Writer writer;
...
private Properties getJNDiProperties() {
final Properties jndiProps = new Properties();
jndiProps.setProperty(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
return jndiProps;
}
@Bean
public JndiTemplate jndiTemplate() {
final JndiTemplate jndiTemplate = new JndiTemplate();
jndiTemplate.setEnvironment(getJNDiProperties());
return jndiTemplate;
}
@Bean
public JdbcCursorItemReader<Object> itemsReader() {
JdbcCursorItemReader<Object> customerJdbcCursorItemReader = new JdbcCursorItemReader<>();
customerJdbcCursorItemReader.setDataSource(dataSource);
customerJdbcCursorItemReader.setSql(env.getProperty("select * from table"));
customerJdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Object.class));
return customerJdbcCursorItemReader;
}
@Bean
public Job registrationChunkJob() {
return jobBuilderFactory.get("jobRegister")
.incrementer(new RunIdIncrementer())
.flow(step()).end().build();
}
@Bean
TaskExecutor taskExecutorStepPush() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(2);
taskExecutor.setMaxPoolSize(20);
taskExecutor.setQueueCapacity(4);
taskExecutor.setAllowCoreThreadTimeOut(true);
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setThreadNamePrefix(LoggingUtil.getWeblogicName() + "-");
return taskExecutor;
}
@Bean
public Step step() {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
attribute.setIsolationLevel(Isolation.READ_COMMITTED.value());
return stepBuilderFactory.get("stepRegister").<Object, Object>chunk(50)
.reader(itemsReader())
.processor(processor)
.writer(writer)
.listener(chunkTransaction)
.taskExecutor(taskExecutorStepPush())
.throttleLimit(1)
.transactionAttribute(attribute)
.build();
}
Класс ChunkTransaction выглядит следующим образом:
@Slf4j
@Component
public class ChunkTransaction {
....
@BeforeChunk
public void onChunkStart(ChunkContext context) throws IOException {
log.debug("onChunkStart");
// do something
}
@AfterChunk
public void onChunkEnd(ChunkContext context) throws IOException {
log.debug("onChunkEnd");
// HERE IS AN ERROR!!!!!!!!!
}
@AfterChunkError
public void onChunkError(ChunkContext context) throws IOException {
log.debug("onChunkError");
// do something
}
}
Класс процессора выглядит следующим образом:
@Slf4j
@Component
public class Processor implements ItemProcessor<Object Object> {
@Override
public Object process(Object object) throws Exception {
log.debug("Processing");
return object;
}
}
Класс Writer выглядит следующим образом:
@Slf4j
@Component
public class Writer implements ItemWriter<Object Object> {
@Override
public void write(List<? extends Object> list) {
log.debug("-------------------- WRITE TO DATABASE --------------------");
SqlParameterSource[] batch = SqlParameterSourceUtils.createBatch(list.toArray());
int[] updateCounts = namedParameterJdbcTemplate.batchUpdate("update table set COLUMN1= :column1, COLUMN2 = :column2 where ID = :id", batch);
log.info(" Updated " + updateCounts.length + " successfully");
}
}
И задание запускается каждые 5 минут, через 2 минуты после запуска приложения
@Component
@Slf4j
public class JobMain {
@Autowired
JobLauncher jobLauncher;
@Retryable
@Scheduled(initialDelay = 120000, fixedRate = 300000)
public void launchJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobRegister", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
}
Это журнал:
[ AdminServer-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [stepRegister]
[ AdminServer-1] c.e.c.l.CocoaChunkTransactionIntegration : onChunkStart
[ AdminServer-1] e.e.e.c.c.e.c.chunk.Processor : Processing
...
[ AdminServer-1] e.e.e.c.c.e.core.chunk.DocumentWriter : -------------------- WRITE TO DATABASE --------------------
[ AdminServer-1] e.e.e.c.c.e.core.chunk.DocumentWriter : Updated 50 successfully
[ AdminServer-1] c.e.c.l.CocoaChunkTransactionIntegration : onChunkEnd <-- AFTER THIS IS AN ERROR!!!!!
[ AdminServer-2] c.e.c.l.CocoaChunkTransactionIntegration : onChunkStart <-- HERE IS NEW THREAD HALF SECOND AFTER "onChunkEnd" log !!!!
[ AdminServer-2] c.e.c.l.CocoaChunkTransactionIntegration : onChunkError <-- THIS ERROR IS IN THAT NEW THREAD
Хорошо, как я и думал. (https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/core/ChunkListener.html):
afterChunk
Callback after the chunk is executed, outside the transaction.
Итак, я могу сделать что-то вместо этого в @AfterChunkметод внутри метода ItemWriter, и в случае его сбоя возникнет исключение, и это вызовет откат задания, и мне нужно отловить эту ошибку, чтобы откатить вручную эту другую транзакцию.
Я надеюсь, что есть более красивый способ добавить другиетранзакция для этой транзакции. Добавление аннотации @Transactional к классу слушателя чанка не работает.
Итак, я могу получить #chunkContext внутри класса Writer?