Пакет масштабирования Spring с использованием AsyncItemProcessor и AsyncItemWriter - PullRequest
0 голосов
/ 16 марта 2020

Я пытаюсь написать решение для масштабирования весенней партии. В весеннем пакете он читает данные (600 000) из базы данных MySQL, затем обрабатывает их и обновляет состояние каждой обработанной строки как «Завершено».

Я использую AsyncItemProcessor и AsyncItemWriter для масштабирования партии пружины.

Проблема:

Если я запускаю партию пружины синхронно, то при выполнении асинхронной порции пружины требуется то же или меньше времени. Я не получаю выгоду от использования многопоточности.

package com.example.batchprocessing;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.annotation.Transactional;

import java.util.Arrays;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

	@Autowired
	public JobBuilderFactory jobBuilderFactory;

	@Autowired
	public StepBuilderFactory stepBuilderFactory;
	

    @Bean
    public JdbcCursorItemReader<Person> reader(DataSource dataSource) {
        JdbcCursorItemReader<Person> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(dataSource);
        reader.setSql("SELECT * from people");
        reader.setRowMapper(new UserRowMapper());

        return reader;
    }



	@Bean
	public PersonItemProcessor processor() {

		return new PersonItemProcessor();
	}

	@Bean
	public AsyncItemProcessor<Person, Person> asyncItemProcessor() throws Exception {

		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(30);
		executor.setMaxPoolSize(50);
		executor.setQueueCapacity(10000);
		executor.setThreadNamePrefix("BatchProcessing-");
		executor.afterPropertiesSet();

		AsyncItemProcessor<Person, Person> asyncProcessor = new AsyncItemProcessor<>();
		asyncProcessor.setDelegate(processor());
		asyncProcessor.setTaskExecutor(executor);
		asyncProcessor.afterPropertiesSet();

		return asyncProcessor;
	}

	@Bean
	public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
		return new JdbcBatchItemWriterBuilder<Person>()
			.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
			.sql("UPDATE people set status= 'completed' where person_id= :id")
			.dataSource(dataSource)
			.build();
	}

    
	@Bean
	public AsyncItemWriter<Person> asyncItemWriter() {
		AsyncItemWriter<Person> asyncWriter = new AsyncItemWriter<>();
		asyncWriter.setDelegate(writer(null));

		return asyncWriter;
	}

	@Bean
	public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
		return jobBuilderFactory.get("importUserJob")
			.incrementer(new RunIdIncrementer())
			.listener(listener)
			.flow(step1)
			.end()
			.build();
	}

	@Bean
	public Step step1(JdbcBatchItemWriter<Person> writer) throws Exception {

		return stepBuilderFactory.get("step1")
			.<Person, Person> chunk(10000)
			.reader(reader(null))
			//.processor(processor())
			//.writer(writer)
			.processor((ItemProcessor) asyncItemProcessor())
			.writer(asyncItemWriter())
			//.throttleLimit(30)
			.build();
	}
	
}

Что я делаю не так? Почему AsyncItemProcessor занимает больше / столько же времени, сколько и синхронная обработка? Обычно это должно занимать меньше времени. Я вижу в журнале несколько потоков, но в конечном итоге время окончания совпадает с синхронной обработкой.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...