Как грипп sh после n записей совершено - PullRequest
0 голосов
/ 04 апреля 2020

Я настроил два источника данных в моем пакетном приложении. Одним из них является чтение записей из базы данных1, а других - для записи записей в базу данных2.

Я использовал пружинный пакет для пакетной обработки записей. Тем не менее, после вставки 3 фрагментов записей, «значение дублированного ключа нарушает ограничение уникальности», когда дублирующий ключ отсутствует. Я прочитал несколько постов, где было предложено flu sh менеджер сущностей, чтобы удалить кэшированные объекты. Я до сих пор не понимаю, как добиться этого с моим текущим приложением, и не делает ли подпружиненная партия гриппа sh после каждой обработки чанка?

@Slf4j
@Configuration
@RequiredArgsConstructor
public class BatchJob{

    private final JobBuilderFactory jobBuilderFactory;

    private final StepBuilderFactory stepBuilderFactory;

    private final WriterRepository writerRepository;

    private final ReaderRepository readerRepository;

    private final BatchConfigurer batchConfigurer;

    @Qualifier("readerDatasource")
    private final HikariDataSource readerDatasource;

    private final HikariDataSource writerDatasource;

    private static final String QUERY_FIND_Reader_Person = "SELECT * FROM person where exists='Y'";

    private static final String QUERY_FIND_Reader_Adderess = "SELECT * from address where exists = 'Y'";

    private static final String QUERY_INSERT_Writer_Person = "INSERT INTO person (person_id,name,exists) VALUES(:personId,:name,:exists)";

    private static final String QUERY_INSERT_Writer_Address = "INSERT INTO address (id,person_id,street,exists)VALUES(:id,:personId,:street,:exist)";

    @Bean
    Job job() {
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .flow(importPersonDetails())
                .next(importAddressDetails())
                .end()
                .build();
    }

    Step importPersonDetails() {
        return this.stepBuilderFactory.get("importPersonDetails")
                .<Person, Person>chunk(500)
                .reader(personItemReader())
                .writer(personWriter())
                .taskExecutor(threadPoolTaskExecutor())
                .listener(new StepListener())
                .build();
    }

    Step importAddressDetails(){
        return this.stepBuilderFactory.get("importAddressDetails")
                .<Address, Address>chunk(500)
                .reader(addressItemReader())
                .writer(addressWriter())
                .taskExecutor(threadPoolTaskExecutor())
                .listener(new StepListener())
                .build();
    }

    JdbcCursorItemReader<Person> personItemReader() {
        return new JdbcCursorItemReaderBuilder<Person>()
                .dataSource(readerDatasource)
                .sql(QUERY_FIND_Reader_Person)
                .rowMapper(new PersonRowMapper())
                .name("PersonItemReader")
                .build();
    }

    JdbcCursorItemReader<Address> AddressItemReader() {
        return new JdbcCursorItemReaderBuilder<Address>()
                .dataSource(readerDatasource)
                .sql(QUERY_FIND_Reader_Adderess)
                .rowMapper(new AddressRowMapper())
                .name("AddressItemReader")
                .build();
    }

    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(100);
        threadPoolTaskExecutor.setMaxPoolSize(200);
        threadPoolTaskExecutor.setThreadnamePrefix("My-Batch-Jobs-TaskExecutor ");
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
        threadPoolTaskExecutor.initialize();
        log.info("Thread Pool Initialized with min {} and Max {} Pool Size",threadPoolTaskExecutor.getCorePoolSize(),threadPoolTaskExecutor.getMaxPoolSize() );
        return threadPoolTaskExecutor;
    }

    JdbcBatchItemWriter<Person> PersonWriter() {
        JdbcBatchItemWriter<Person> PersonJdbcBatchItemWriter = new JdbcBatchItemWriter<>();
        PersonJdbcBatchItemWriter.setDataSource(writerDatasource);
        PersonJdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
        PersonJdbcBatchItemWriter.setSql(QUERY_INSERT_Writer_PersonPerson);
        PersonJdbcBatchItemWriter.afterPropertiesSet();
        return PersonJdbcBatchItemWriter;
    }

    JdbcBatchItemWriter<Address> AddressWriter() {
        JdbcBatchItemWriter<Address> AddressJdbcBatchItemWriter = new JdbcBatchItemWriter<>();
        AddressJdbcBatchItemWriter.setDataSource(writerDatasource);
        AddressJdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Address>());
        AddressJdbcBatchItemWriter.setSql(QUERY_INSERT_Writer_Address);
        AddressJdbcBatchItemWriter.afterPropertiesSet();
        return  AddressJdbcBatchItemWriter;
    }
}

1 Ответ

0 голосов
/ 06 апреля 2020

«значение дублированного ключа нарушает ограничение уникальности», когда нет дублирующего ключа`

Код не сгенерирует это исключение, если не было дубликатов. Так что если у вас есть это исключение, у вас есть дубликаты.

Я прочитал несколько сообщений, в которых было предложено flu sh диспетчер сущностей для удаления кэшированных объектов

В соответствии с вашей конфигурацией вы используете JdbcBatchItemWriter а не JpaItemWriter, поэтому я не понимаю, почему мы говорим об очистке менеджера сущностей здесь. JdbcBatchItemWriter не использует менеджер сущностей, он использует шаблон jdb c для отправки операторов вставки / обновления в вашу базу данных, которые будут зафиксированы / откатаны после завершения транзакции.

Вам необходимо убедитесь, что вы не вставляете дубликаты в последующие фрагменты.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...