Я пытаюсь написать решение для масштабирования весенней партии. В весеннем пакете он читает данные (600 000) из базы данных MySQL, затем обрабатывает их и обновляет состояние каждой обработанной строки как «Завершено».
Я использую AsyncItemProcessor
и AsyncItemWriter
для масштабирования партии пружины.
Проблема:
Если я запускаю партию пружины синхронно, то при выполнении асинхронной порции пружины требуется то же или меньше времени. Я не получаю выгоду от использования многопоточности.
package com.example.batchprocessing;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays;
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public JdbcCursorItemReader<Person> reader(DataSource dataSource) {
JdbcCursorItemReader<Person> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setSql("SELECT * from people");
reader.setRowMapper(new UserRowMapper());
return reader;
}
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
public AsyncItemProcessor<Person, Person> asyncItemProcessor() throws Exception {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(30);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(10000);
executor.setThreadNamePrefix("BatchProcessing-");
executor.afterPropertiesSet();
AsyncItemProcessor<Person, Person> asyncProcessor = new AsyncItemProcessor<>();
asyncProcessor.setDelegate(processor());
asyncProcessor.setTaskExecutor(executor);
asyncProcessor.afterPropertiesSet();
return asyncProcessor;
}
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("UPDATE people set status= 'completed' where person_id= :id")
.dataSource(dataSource)
.build();
}
@Bean
public AsyncItemWriter<Person> asyncItemWriter() {
AsyncItemWriter<Person> asyncWriter = new AsyncItemWriter<>();
asyncWriter.setDelegate(writer(null));
return asyncWriter;
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) throws Exception {
return stepBuilderFactory.get("step1")
.<Person, Person> chunk(10000)
.reader(reader(null))
//.processor(processor())
//.writer(writer)
.processor((ItemProcessor) asyncItemProcessor())
.writer(asyncItemWriter())
//.throttleLimit(30)
.build();
}
}
Что я делаю не так? Почему AsyncItemProcessor
занимает больше / столько же времени, сколько и синхронная обработка? Обычно это должно занимать меньше времени. Я вижу в журнале несколько потоков, но в конечном итоге время окончания совпадает с синхронной обработкой.