Ниже приведена конфигурация моего весеннего пакетного задания, которое принимает записи из БД, выполняет некоторую обработку в процессоре элементов, обновляет столбец состояния и записывает обратно в БД.
Когда я бегал по 10 тыс. Записей, я мог видеть, как он берет каждую запись одну за другой и обновляет статус таким же образом.Первоначально я планировал использовать многопоточность, но это не имеет никакого смысла, так как моя работа выполняется один раз в день с количеством записей от 10 до 100 тыс.(Записи составляют менее 5 тыс. В большинстве дней, и очень мало дней в году (от 5 до 10 дней) до 50–100 тыс.).
Я не хочу добавлять больше процессоров и получать оплату от Kubernetes только на 10 дней в году.Теперь проблема в том, что когда я запустил это задание, для каждого запроса на выборку требуется всего 100 записей, а не по 100 за раз.Кроме того, обновление - это также одна запись за раз, и для обработки 10 тыс. Записей требуется 10 минут, что очень медленно.
Как можно быстрее читать, обрабатывать и записывать? Я могу избавиться от многопоточности и время от времени немного больше загружать процессор.Дополнительная информация представлена в виде комментариев в коде.
@Configuration
@EnableBatchProcessing
public class BatchConfiguration extends DefaultBatchConfigurer{
public final static Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Autowired
MyRepository myRepository;
@Autowired
private EntityManagerFactory entityManagerFactory;
@Value("${chunk-size}")
private int chunkSize;
@Value("${max-threads}")
private int maxThreads;
private final DataSource dataSource;
/**
* @param dataSource
* Override to do not set datasource even if a datasource exist during intialization.
* Initialize will use a Map based JobRepository (instead of database) for Spring batch meta tables
*/
@Override
public void setDataSource(DataSource dataSource) {
}
@Override
public PlatformTransactionManager getTransactionManager() {
return jpaTransactionManager();
}
@Autowired
public BatchConfiguration(@Qualifier("dataSource") DataSource dataSource) {
this.dataSource = dataSource;
}
@Bean
public JpaTransactionManager jpaTransactionManager() {
final JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setDataSource(dataSource);
return transactionManager;
}
@Bean
@StepScope
public JdbcPagingItemReader<ModelEntity> importReader() { // I tried using RepositoryItemReader but records were skipped by JPA hence I went for JdbcPagingItemReader
JdbcPagingItemReader<ModelEntity> reader = new JdbcPagingItemReader<ModelEntity>();
final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
sqlPagingQueryProviderFactoryBean.setDataSource( dataSource );
sqlPagingQueryProviderFactoryBean.setSelectClause( "SELECT *" );
sqlPagingQueryProviderFactoryBean.setFromClause( "FROM mytable" );
sqlPagingQueryProviderFactoryBean.setWhereClause( "WHERE STATUS = 'myvalue' " );
sqlPagingQueryProviderFactoryBean.setSortKey( "primarykey" );
try {
reader.setQueryProvider( sqlPagingQueryProviderFactoryBean.getObject() );
} catch (Exception e) {
e.printStackTrace();
}
reader.setDataSource( dataSource );
reader.setPageSize( chunkSize );
reader.setSaveState( Boolean.FALSE );
reader.setRowMapper( new BeanPropertyRowMapper<ModelEntity>(ModelEntity.class ) );
return reader;
}
@Bean
public ItemWriter<ModelEntity> databaseWriter() {
RepositoryItemWriter<ModelEntity> repositoryItemWriter=new RepositoryItemWriter<>();
repositoryItemWriter.setRepository(myRepository);
repositoryItemWriter.setMethodName("save");
return repositoryItemWriter;
}
@Bean
public Myprocessor myprocessor() {
return new Myprocessor();
}
@Bean
public JobExecutionListener jobExecutionListener() {
return new JobExecutionListener();
}
@Bean
public StepExecutionListener stepExecutionListener() {
return new StepExecutionListener();
}
@Bean
public ChunkExecutionListener chunkListener() {
return new ChunkExecutionListener();
}
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(maxThreads);
return taskExecutor;
}
@Bean
public Job processJob() {
return jobBuilderFactory.get("myjob")
.incrementer(new RunIdIncrementer())
.start(processStep())
.listener(jobExecutionListener())
.build();
}
@Bean
public Step processStep() {
return stepBuilderFactory.get("processStep")
.<ModelEntity,ModelEntity>chunk(chunkSize)
.reader(importReader())
.processor(myprocessor())
.writer(databaseWriter())
.taskExecutor(taskExecutor())
.listener(stepExecutionListener())
.listener(chunkListener())
.transactionManager(getTransactionManager())
.throttleLimit(maxThreads)
.build();
}
}
Я использую репозиторий JpaRepository
и код ниже.(При условии, что метод сохранения его родительского класса CrudRepository сделает сохранение)
public interface MyRepository extends JpaRepository<ModelEntity, BigInteger> {
}
Процессор такой, как показано ниже
@Component
public class Myprocessor implements ItemProcessor<Myprocessor,Myprocessor> {
@Override
public ModelEntity process(ModelEntity modelEntity) throws Exception {
try {
// This is fast and working fine
if ((myProcessing)) {
modelEntity.setStatus(success);
} else {
modelEntity.setStatus(failed);
}
}
catch (Exception e){
logger.info( "Exception occurred while processing"+e );
}
return modelEntity;
}
// This is fast and working fine
public Boolean myProcessing(ModelEntity modelEntity){
//Processor Logic Here
return processingStatus;
}
}
Файл свойств ниже
logging.level.org.hibernate.SQL=DEBUG
logging.level.com.zaxxer.hikari.HikariConfig=DEBUG
logging.level.org.hibernate.type.descriptor.sql.BasicBinder=TRACE
logging.level.org.springframework.jdbc.core.JdbcTemplate=DEBUG
logging.level.org.springframework.jdbc.core.StatementCreatorUtils=TRACE
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.url=url
spring.datasource.username=username
spring.datasource.password=password
spring.jpa.hibernate.connection.provider_class
=org.hibernate.hikaricp.internal.HikariCPConnectionProvider
spring.jpa.database-platform=org.hibernate.dialect.Oracle10gDialect
spring.jpa.show-sql=false
spring.main.allow-bean-definition-overriding=true
spring.batch.initializer.enabled=false
spring.batch.job.enabled=false
spring.batch.initialize-schema=never
chunk-size=100
max-threads=5