Многопоточность Spring Batch Step не работает - PullRequest
0 голосов
/ 03 мая 2020

В настоящее время я создаю пакет в Groovy при получении информации о продукте с сервера. По сути, пакет считывает список объектов TGeneralProduct из базы данных и должен получить с сервера соответствующий TBeautyProduct. Каждый TGeneralProduct имеет серверные URL-адреса для вызова, чтобы актуализировать реальный продукт. Учитывая, что каждый вызов сервера занимает много времени, я хотел ускорить процесс, используя многопоточность в шаге. Таким образом, я актуализирую много продуктов одновременно. К сожалению, это не работает, но я не понимаю, почему. Вот мой конфиг:

Конфигурация Beans:

@Bean
    public Job refreshBeautyProductsJob() {
        return jobBuilderFactory.get("RefreshBeautyProductsJob")
                .incrementer(new RunIdIncrementer())
                .start(readAllBeautyProductsToProcessStep())
                .build();
    }
@Bean
    ItemReader<TGeneralProduct> readAllBeautyProductsToProcessReader(SessionFactory sessionFactory) {
        HibernateCursorItemReader databaseReader = new HibernateCursorItemReader()
        databaseReader.setQueryString("from TGeneralProduct")
        databaseReader.setUseStatelessSession(false)
        databaseReader.setSessionFactory(sessionFactory)
        databaseReader.setSaveState(false)
        return databaseReader;
    }

    @Bean
    ItemProcessor<TGeneralProduct, List<TBeautyProduct>> readAllBeautyProductsToProcessProcessor() {
        new ReadAllBeautyProductsToProcessProcessor()
    }

    @Bean
    ItemWriter<List<List<TBeautyProduct>>> readAllBeautyProductsToProcessWriter(){
        new ReadAllBeautyProductsToProcessWriter()
    }

    @Bean
    public Step readAllBeautyProductsToProcessStep(PlatformTransactionManager transactionManager) {
        return stepBuilderFactory.get("readAllBeautyProductsToProcessStep")
                .transactionManager(transactionManager)
                .<TGeneralProduct, TBeautyProduct> chunk(10)
                .reader(readAllBeautyProductsToProcessReader())
                .processor(readAllBeautyProductsToProcessProcessor())
                .writer(readAllBeautyProductsToProcessWriter())
                .taskExecutor(taskExecutorReadAllBeautyProductsToProcessStep())
                .build();
    }

    @Bean
    TaskExecutor taskExecutorReadAllBeautyProductsToProcessStep() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(100);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }

Процессор:

List<TBeautyProduct> process(TGeneralProduct generalProduct) throws Exception {
        List<TBeautyProduct> tBeautyProductList = new ArrayList<>()
        for(TUrl url : generalProduct.getUrls()){
            // contacting server here and getting information about beauty product.
        }
        return tBeautyProductList
    }

Writer:

public void write(List<? extends List<TBeautyProduct>> items) throws Exception {
        for(List<TBeautyProduct> listTBeautyProducts : items){
            for(TBeautyProduct tBeautyProduct : listTBeautyProducts){
               // Write objects to database.
            }
        }
    }

Когда я запускаю пакет, Шаг никогда не выполняется параллельно, и он занимает вечность (у меня есть более 10000 продуктов для реализации). Я также попытался заменить ThreadPoolExecutorTask на SimpleAsyncTaskExecutor. Но тогда я получаю ошибки, что Читатель не был закрыт должным образом. Буду признателен за помощь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...