В настоящее время я создаю пакет в Groovy при получении информации о продукте с сервера. По сути, пакет считывает список объектов TGeneralProduct из базы данных и должен получить с сервера соответствующий TBeautyProduct. Каждый TGeneralProduct имеет серверные URL-адреса для вызова, чтобы актуализировать реальный продукт. Учитывая, что каждый вызов сервера занимает много времени, я хотел ускорить процесс, используя многопоточность в шаге. Таким образом, я актуализирую много продуктов одновременно. К сожалению, это не работает, но я не понимаю, почему. Вот мой конфиг:
Конфигурация Beans:
@Bean
public Job refreshBeautyProductsJob() {
return jobBuilderFactory.get("RefreshBeautyProductsJob")
.incrementer(new RunIdIncrementer())
.start(readAllBeautyProductsToProcessStep())
.build();
}
@Bean
ItemReader<TGeneralProduct> readAllBeautyProductsToProcessReader(SessionFactory sessionFactory) {
HibernateCursorItemReader databaseReader = new HibernateCursorItemReader()
databaseReader.setQueryString("from TGeneralProduct")
databaseReader.setUseStatelessSession(false)
databaseReader.setSessionFactory(sessionFactory)
databaseReader.setSaveState(false)
return databaseReader;
}
@Bean
ItemProcessor<TGeneralProduct, List<TBeautyProduct>> readAllBeautyProductsToProcessProcessor() {
new ReadAllBeautyProductsToProcessProcessor()
}
@Bean
ItemWriter<List<List<TBeautyProduct>>> readAllBeautyProductsToProcessWriter(){
new ReadAllBeautyProductsToProcessWriter()
}
@Bean
public Step readAllBeautyProductsToProcessStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("readAllBeautyProductsToProcessStep")
.transactionManager(transactionManager)
.<TGeneralProduct, TBeautyProduct> chunk(10)
.reader(readAllBeautyProductsToProcessReader())
.processor(readAllBeautyProductsToProcessProcessor())
.writer(readAllBeautyProductsToProcessWriter())
.taskExecutor(taskExecutorReadAllBeautyProductsToProcessStep())
.build();
}
@Bean
TaskExecutor taskExecutorReadAllBeautyProductsToProcessStep() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(100);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
Процессор:
List<TBeautyProduct> process(TGeneralProduct generalProduct) throws Exception {
List<TBeautyProduct> tBeautyProductList = new ArrayList<>()
for(TUrl url : generalProduct.getUrls()){
// contacting server here and getting information about beauty product.
}
return tBeautyProductList
}
Writer:
public void write(List<? extends List<TBeautyProduct>> items) throws Exception {
for(List<TBeautyProduct> listTBeautyProducts : items){
for(TBeautyProduct tBeautyProduct : listTBeautyProducts){
// Write objects to database.
}
}
}
Когда я запускаю пакет, Шаг никогда не выполняется параллельно, и он занимает вечность (у меня есть более 10000 продуктов для реализации). Я также попытался заменить ThreadPoolExecutorTask на SimpleAsyncTaskExecutor. Но тогда я получаю ошибки, что Читатель не был закрыт должным образом. Буду признателен за помощь.