Я настроил два источника данных в моем пакетном приложении. Одним из них является чтение записей из базы данных1, а других - для записи записей в базу данных2.
Я использовал пружинный пакет для пакетной обработки записей. Тем не менее, после вставки 3 фрагментов записей, «значение дублированного ключа нарушает ограничение уникальности», когда дублирующий ключ отсутствует. Я прочитал несколько постов, где было предложено flu sh менеджер сущностей, чтобы удалить кэшированные объекты. Я до сих пор не понимаю, как добиться этого с моим текущим приложением, и не делает ли подпружиненная партия гриппа sh после каждой обработки чанка?
@Slf4j
@Configuration
@RequiredArgsConstructor
public class BatchJob{
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final WriterRepository writerRepository;
private final ReaderRepository readerRepository;
private final BatchConfigurer batchConfigurer;
@Qualifier("readerDatasource")
private final HikariDataSource readerDatasource;
private final HikariDataSource writerDatasource;
private static final String QUERY_FIND_Reader_Person = "SELECT * FROM person where exists='Y'";
private static final String QUERY_FIND_Reader_Adderess = "SELECT * from address where exists = 'Y'";
private static final String QUERY_INSERT_Writer_Person = "INSERT INTO person (person_id,name,exists) VALUES(:personId,:name,:exists)";
private static final String QUERY_INSERT_Writer_Address = "INSERT INTO address (id,person_id,street,exists)VALUES(:id,:personId,:street,:exist)";
@Bean
Job job() {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.flow(importPersonDetails())
.next(importAddressDetails())
.end()
.build();
}
Step importPersonDetails() {
return this.stepBuilderFactory.get("importPersonDetails")
.<Person, Person>chunk(500)
.reader(personItemReader())
.writer(personWriter())
.taskExecutor(threadPoolTaskExecutor())
.listener(new StepListener())
.build();
}
Step importAddressDetails(){
return this.stepBuilderFactory.get("importAddressDetails")
.<Address, Address>chunk(500)
.reader(addressItemReader())
.writer(addressWriter())
.taskExecutor(threadPoolTaskExecutor())
.listener(new StepListener())
.build();
}
JdbcCursorItemReader<Person> personItemReader() {
return new JdbcCursorItemReaderBuilder<Person>()
.dataSource(readerDatasource)
.sql(QUERY_FIND_Reader_Person)
.rowMapper(new PersonRowMapper())
.name("PersonItemReader")
.build();
}
JdbcCursorItemReader<Address> AddressItemReader() {
return new JdbcCursorItemReaderBuilder<Address>()
.dataSource(readerDatasource)
.sql(QUERY_FIND_Reader_Adderess)
.rowMapper(new AddressRowMapper())
.name("AddressItemReader")
.build();
}
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(100);
threadPoolTaskExecutor.setMaxPoolSize(200);
threadPoolTaskExecutor.setThreadnamePrefix("My-Batch-Jobs-TaskExecutor ");
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
threadPoolTaskExecutor.initialize();
log.info("Thread Pool Initialized with min {} and Max {} Pool Size",threadPoolTaskExecutor.getCorePoolSize(),threadPoolTaskExecutor.getMaxPoolSize() );
return threadPoolTaskExecutor;
}
JdbcBatchItemWriter<Person> PersonWriter() {
JdbcBatchItemWriter<Person> PersonJdbcBatchItemWriter = new JdbcBatchItemWriter<>();
PersonJdbcBatchItemWriter.setDataSource(writerDatasource);
PersonJdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
PersonJdbcBatchItemWriter.setSql(QUERY_INSERT_Writer_PersonPerson);
PersonJdbcBatchItemWriter.afterPropertiesSet();
return PersonJdbcBatchItemWriter;
}
JdbcBatchItemWriter<Address> AddressWriter() {
JdbcBatchItemWriter<Address> AddressJdbcBatchItemWriter = new JdbcBatchItemWriter<>();
AddressJdbcBatchItemWriter.setDataSource(writerDatasource);
AddressJdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Address>());
AddressJdbcBatchItemWriter.setSql(QUERY_INSERT_Writer_Address);
AddressJdbcBatchItemWriter.afterPropertiesSet();
return AddressJdbcBatchItemWriter;
}
}