Spring Batch с SimpleAsyncTaskExecutor не сохраняется в БД - PullRequest
0 голосов
/ 02 мая 2020

У меня есть простое приложение Spring Boot с enpoint, которое асинхронно вызывает Spring Batch Job через SimpleAsyncTaskExecutor, настроенный в bean-компоненте JobLauncher.

Spring Batch Job запускается асинхронно и работает нормально, но ничего не сохраняется в база данных.

Если я удаляю SimpleAsyncTaskExecutor, данные сохраняются.

Это мой BatchConfigurer. Здесь я настраиваю JobLauncher с помощью SimpleAsyncTaskExecutor. Если я удалю строку simpleJobLauncher.setTaskExecutor (new SimpleAsyncTaskExecutor ()); // (1) данные сохранены.

@Component
public class CustomBatchConfiguration implements BatchConfigurer {

private static final Log LOGGER = LogFactory.getLog(CustomBatchConfiguration.class);

@Autowired
private BatchProperties properties;

@Autowired
private DataSource dataSource;

@Autowired
private EntityManagerFactory entityManagerFactory;

private PlatformTransactionManager transactionManager;

private JobRepository jobRepository;

private JobLauncher jobLauncher;

private JobExplorer jobExplorer;

/**
 * Registers {@link JobRepository} bean.
 */
@Override
public JobRepository getJobRepository() {
    return this.jobRepository;
}

/**
 * Registers {@link PlatformTransactionManager} bean.
 */
@Override
public PlatformTransactionManager getTransactionManager() {
    return this.transactionManager;
}

/**
 * Registers {@link JobLauncher} bean.
 */
@Override
public JobLauncher getJobLauncher() {
    return this.jobLauncher;
}

/**
 * Registers {@link JobExplorer} bean. This bean is actually created in
 * {@link BatchConfig}.
 */
@Override
public JobExplorer getJobExplorer() throws Exception {
    return this.jobExplorer;
}

/**
 * Initializes Spring Batch components.
 */
@PostConstruct
public void initialize() {
    try {
        this.transactionManager = createTransactionManager();
        this.jobRepository = createJobRepository();
        this.jobLauncher = createJobLauncher();
        this.jobExplorer = createJobExplorer();
    } catch (Exception ex) {
        throw new IllegalStateException("Unable to initialize Spring Batch", ex);
    }
}

private JobExplorer createJobExplorer() throws Exception {
    JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
    jobExplorerFactoryBean.setDataSource(this.dataSource);
    String tablePrefix = this.properties.getTablePrefix();

    if (StringUtils.hasText(tablePrefix)) {
        jobExplorerFactoryBean.setTablePrefix(tablePrefix);
    }

    jobExplorerFactoryBean.afterPropertiesSet();
    return jobExplorerFactoryBean.getObject();
}

private JobLauncher createJobLauncher() throws Exception {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();

    simpleJobLauncher.setJobRepository(getJobRepository());
    simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); // (1)
    simpleJobLauncher.afterPropertiesSet();

    return simpleJobLauncher;
}

private JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();

    jobRepositoryFactoryBean.setDatabaseType("db2");
    jobRepositoryFactoryBean.setDataSource(this.dataSource);

    if (this.entityManagerFactory != null) {
        LOGGER.warn("JPA does not support custom isolation levels, so locks may not be taken when launching Jobs");
        jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_DEFAULT");
    }

    String tablePrefix = this.properties.getTablePrefix();
    if (StringUtils.hasText(tablePrefix)) {
        jobRepositoryFactoryBean.setTablePrefix(tablePrefix);
    }

    jobRepositoryFactoryBean.setTransactionManager(getTransactionManager());
    jobRepositoryFactoryBean.afterPropertiesSet();

    return jobRepositoryFactoryBean.getObject();
}

private PlatformTransactionManager createTransactionManager() {
    return new DataSourceTransactionManager(dataSource);
}

}

И это моя BatchConfiguration.

@Autowired(required = true)
private MyItemReader myItemReader;

@Autowired(required = true)
private MyItemProcessor myItemProcessor;

@Autowired(required = true)
private MyItemWriter myItemWriter;


@Bean
public Step myStep(TaskExecutor taskExecutor) { 

    return stepBuilderFactory.get("myStepName")
            .<SomeWrapper, SomeWrapper>chunk(
                    1)
            .reader(myItemReader)
            .processor(myItemProcessor).writer(myItemWriter)

            .build();
}

@Bean(name = "myJob")
public Job myJob(Step myStep) {
    return jobBuilderFactory.get("myJobName").incrementer(new RunIdIncrementer())
            .flow(myStep).end().build();
}

Я что-то упустил?

Заранее спасибо

1 Ответ

0 голосов
/ 03 мая 2020

Я сделал следующее, чтобы решить мою проблему:

  1. В CustomBatchConfiguration Я изменил следующее

    private PlatformTransactionManager createTransactionManager() {        
        return new DataSourceTransactionManager(dataSource);
    }
    

на

private PlatformTransactionManager createTransactionManager() {
    if (this.entityManagerFactory != null) {
        return new JpaTransactionManager(this.entityManagerFactory);
    }

    return new DataSourceTransactionManager(this.dataSource);
}

Я использовал пружинный JpaTransactionManager.

Я изменяю JobLauncher TaskExecutor с

simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());

на

ThreadPoolTaskExecutor threadPoolExecutor = new ThreadPoolTaskExecutor();
threadPoolExecutor.setMaxPoolSize(springBatchJobThreadPollMaxSize);
threadPoolExecutor.afterPropertiesSet();

TaskExecutor taskExecutor = new DelegatingSecurityContextAsyncTaskExecutor(threadPoolExecutor);

Создание более надежного пула потоков и совместное использование контекста безопасности пружины их.

Я использовал тасклет вместо Spring Batch, ориентированный на чанки. Здесь Подробности.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...