Исключение в пакетном слушателе не выполняет откат транзакции задания - PullRequest
0 голосов
/ 05 ноября 2019

Я добавляю слушателя к чанку, чтобы выполнить какую-то работу, и, если он потерпел неудачу, я ожидал, что вся работа не удалась, и откат вставил в базу данных в ItemWriter. Но это не откат. Я думаю, либо @BeforeChunk и @AfterChunk находятся вне чанка, поэтому они не находятся внутри одной транзакции, либо @AfterChunkError просто перехватывает ошибку, и мне следует распространить эту ошибку, чтобы сообщить заданию, что это ошибка, и откатить все.

Он развернут под Weblogic 12.2.1.3.0, и я использую Spring Boot 2.3.0. В Weblogic для источника данных я использую имя класса драйвера oracle.jdbc.replay.OracleXADataSourceImpl. Мой pom-файл выглядит так:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.0.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
...
    <dependency>
        <groupId>com.oracle.jdbc</groupId>
        <artifactId>ojdbc8</artifactId>
        <version>12.2.0.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-tomcat</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
...
</project>

application.properties:

spring.batch.job.enabled=false
spring.datasource.jndi-name=jdbc/NameDS
spring.datasource.driver-class-name=oracle.jdbc.OracleDriver
spring.batch.initialize-schema=never

Моя конфигурация:

@Autowired
DataSource dataSource;
@Autowired
Processor processor;
@Autowired
Writer writer;
...
private Properties getJNDiProperties() {
    final Properties jndiProps = new Properties();
    jndiProps.setProperty(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
    return jndiProps;
}

@Bean
public JndiTemplate jndiTemplate() {
    final JndiTemplate jndiTemplate = new JndiTemplate();
    jndiTemplate.setEnvironment(getJNDiProperties());
    return jndiTemplate;
}
@Bean
public JdbcCursorItemReader<Object> itemsReader() {
    JdbcCursorItemReader<Object> customerJdbcCursorItemReader = new JdbcCursorItemReader<>();
    customerJdbcCursorItemReader.setDataSource(dataSource);
    customerJdbcCursorItemReader.setSql(env.getProperty("select * from table"));
    customerJdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Object.class));
    return customerJdbcCursorItemReader;
}

@Bean
public Job registrationChunkJob() {
    return jobBuilderFactory.get("jobRegister")
            .incrementer(new RunIdIncrementer())
            .flow(step()).end().build();
}

@Bean
TaskExecutor taskExecutorStepPush() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(2);
    taskExecutor.setMaxPoolSize(20);
    taskExecutor.setQueueCapacity(4);
    taskExecutor.setAllowCoreThreadTimeOut(true);
    taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    taskExecutor.setThreadNamePrefix(LoggingUtil.getWeblogicName() + "-");
    return taskExecutor;
}
@Bean
public Step step() {
    DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
    attribute.setPropagationBehavior(Propagation.REQUIRED.value());
    attribute.setIsolationLevel(Isolation.READ_COMMITTED.value());

    return stepBuilderFactory.get("stepRegister").<Object, Object>chunk(50)
            .reader(itemsReader())
            .processor(processor)
            .writer(writer)
            .listener(chunkTransaction)
            .taskExecutor(taskExecutorStepPush())
            .throttleLimit(1)
            .transactionAttribute(attribute)
            .build();
}

Класс ChunkTransaction выглядит следующим образом:

@Slf4j
@Component
public class ChunkTransaction {
....
    @BeforeChunk
    public void onChunkStart(ChunkContext context) throws IOException {
    log.debug("onChunkStart");
    // do something
    }
    @AfterChunk
    public void onChunkEnd(ChunkContext context) throws IOException {
    log.debug("onChunkEnd");
    // HERE IS AN ERROR!!!!!!!!!
    }
    @AfterChunkError
    public void onChunkError(ChunkContext context) throws IOException {
    log.debug("onChunkError");
    // do something
    }
}

Класс процессора выглядит следующим образом:

@Slf4j
@Component
public class Processor implements ItemProcessor<Object Object> {
    @Override
    public Object process(Object object) throws Exception {
        log.debug("Processing");
        return object;
    }

}

Класс Writer выглядит следующим образом:

@Slf4j
@Component
public class Writer implements ItemWriter<Object Object> {
    @Override
    public void write(List<? extends Object> list) {
    log.debug("-------------------- WRITE TO DATABASE --------------------");
    SqlParameterSource[] batch = SqlParameterSourceUtils.createBatch(list.toArray());
    int[] updateCounts = namedParameterJdbcTemplate.batchUpdate("update table set COLUMN1= :column1, COLUMN2 = :column2 where ID = :id", batch);
    log.info(" Updated " + updateCounts.length + " successfully");
    }

}

И задание запускается каждые 5 минут, через 2 минуты после запуска приложения

@Component
@Slf4j
public class JobMain {
        @Autowired
        JobLauncher jobLauncher;

        @Retryable
        @Scheduled(initialDelay = 120000, fixedRate = 300000)
        public void launchJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        JobParameters jobParameters = new JobParametersBuilder()
            .addString("jobRegister", String.valueOf(System.currentTimeMillis()))
            .toJobParameters();
        jobLauncher.run(job, jobParameters);
}
}

Это журнал:

[  AdminServer-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [stepRegister]
[  AdminServer-1] c.e.c.l.CocoaChunkTransactionIntegration : onChunkStart
[  AdminServer-1] e.e.e.c.c.e.c.chunk.Processor    : Processing
...
[  AdminServer-1] e.e.e.c.c.e.core.chunk.DocumentWriter    : -------------------- WRITE TO DATABASE --------------------
[  AdminServer-1] e.e.e.c.c.e.core.chunk.DocumentWriter    :  Updated 50 successfully
[  AdminServer-1] c.e.c.l.CocoaChunkTransactionIntegration : onChunkEnd <-- AFTER THIS IS AN ERROR!!!!!
[  AdminServer-2] c.e.c.l.CocoaChunkTransactionIntegration : onChunkStart <-- HERE IS NEW THREAD HALF SECOND AFTER "onChunkEnd" log !!!!
[  AdminServer-2] c.e.c.l.CocoaChunkTransactionIntegration : onChunkError <-- THIS ERROR IS IN THAT NEW THREAD

Хорошо, как я и думал. (https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/core/ChunkListener.html):

afterChunk
Callback after the chunk is executed, outside the transaction.

Итак, я могу сделать что-то вместо этого в @AfterChunkметод внутри метода ItemWriter, и в случае его сбоя возникнет исключение, и это вызовет откат задания, и мне нужно отловить эту ошибку, чтобы откатить вручную эту другую транзакцию.

Я надеюсь, что есть более красивый способ добавить другиетранзакция для этой транзакции. Добавление аннотации @Transactional к классу слушателя чанка не работает.

Итак, я могу получить #chunkContext внутри класса Writer?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...