Тайм-аут соединения с БД при использовании AsyncItemProcessor и AsyncItemWriter - PullRequest
0 голосов
/ 31 августа 2018

Когда я пытаюсь использовать AsyncItemProcessor и AsyncItemWriter, пишущий, кажется, ничего не делает. После того, как процессор обработает все записи и поместит их в блок, писатель ничего не делает, и время соединения истекло. Ниже моя конфигурация.

@Configuration
@EnableAutoConfiguration
public class EmployeeJobConfiguration {

    @Autowired
    private EmployeeService employeeService;

    /**
     * Default Constructor
     */
    public EmployeeJobConfiguration() {
        super();
    }

    @Bean
    public Job employeeJob() throws Exception {
        return jobBuilderFactory.get("employeeJob")
                .start(employeeJobStep1())
                .listener(executionListener())
                .build();
    }


    @SuppressWarnings("unchecked")
    @Bean
    public Step employeeJobStep1() throws Exception {
        return ((SimpleStepBuilder<Employee, EmployeeResult>) stepBuilderFactory.get("step1")
                .allowStartIfComplete(allowRestart)
                .<Employee, Future<EmployeeResult>>chunk(chunkSize)
                .reader(employeeJobReader())
                .processor(asyncItemProcessor())
                .writer(asyncItemWriter())
                .build();
    }

    @StepScope
    @Bean
    public ItemReader<Employee> employeeJobReader(){
        RepositoryItemReader<Employee> reader = new RepositoryItemReader<>();
        reader.setRepository(employeeService.getRepository());
        reader.setPageSize(chunkSize);
        reader.setMethodName("findAll");
        reader.setSort(getSortMap(ASC, "employeeID"));
        return reader;
    }


    @Bean
    public ItemProcessor<Employee, Future<EmployeeResult>> asyncItemProcessor() throws Exception{
        AsyncItemProcessor<Employee, EmployeeResult> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(employeeProcessor());
        asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
        asyncItemProcessor.afterPropertiesSet();
        return asyncItemProcessor;
    }


     /**
     * Job Processor Method
     * @return
     */
    @Bean
    public ItemProcessor<Employee, EmployeeResult> employeeProcessor() {
        return new EmployeeProcessor();
    }

    @Bean(name = "asyncExecutor")
    public TaskExecutor getAsyncExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(10);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("AsyncExecutor-");
        return executor;
    }



    @Bean
    public ItemWriter<Future<EmployeeResult>> asyncItemWriter() throws Exception{
        AsyncItemWriter<EmployeeResult> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(employeeJobWriter());
        asyncItemWriter.afterPropertiesSet();
        return asyncItemWriter;
    }

    @Bean
    public ItemWriter<EmployeeResult> employeeJobWriter() {
        return new EmployeeJobWriter();
    }

Я получаю следующее исключение.

2018-08-31 14:00:36 ОШИБКА [AsyncExecutor-1] cpcibeEmployeeErrorHandler - исключение [EclipseLink-4002] (Eclipse Persistence Services - 2.6.4.v20160829-44060b6): org.eclipse.persistence.exceptions. DatabaseException

Внутреннее исключение: java.sql.SQLTransientConnectionException: springHikariCP - Соединение недоступно, время запроса истекло после 30000 мс. Код ошибки: 0 Запрос: ReadObjectQuery (referenceClass = ProcessError) org.eclipse.persistence.exceptions.DatabaseException: Внутреннее исключение: java.sql.SQLTransientConnectionException: springHikariCP - Соединение недоступно, время запроса истекло после 30000 мс. Код ошибки: 0

1 Ответ

0 голосов
/ 31 августа 2018

Вам не нужно иметь дело с типом Future при использовании AsyncItemProcessor и AsyncItemWriter. Эти компоненты будут обрабатывать асинхронную обработку / запись прозрачно, так что вы, как конечный пользователь, можете использовать их как обычные процессоры / записывающие устройства, не имея дело с конструкциями параллелизма низкого уровня.

Например:

@Bean
public ItemProcessor<Employee, EmployeeResult> asyncItemProcessor() throws Exception{
    AsyncItemProcessor<Employee, EmployeeResult> asyncItemProcessor = new AsyncItemProcessor<>();
    asyncItemProcessor.setDelegate(employeeProcessor());
    asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
    return asyncItemProcessor;
}

@Bean
public ItemWriter<EmployeeResult> asyncItemWriter() throws Exception{
    AsyncItemWriter<EmployeeResult> asyncItemWriter = new AsyncItemWriter<>();
    asyncItemWriter.setDelegate(employeeJobWriter());
    return asyncItemWriter;
}

@Bean
public Step employeeJobStep1() throws Exception {
    return stepBuilderFactory.get("step1")
            .allowStartIfComplete(allowRestart)
            .<Employee, EmployeeResult>chunk(chunkSize)
            .reader(employeeJobReader())
            .processor(asyncItemProcessor())
            .writer(asyncItemWriter())
            .build();
}
...