Обработка больших данных с использованием многопоточного шага Spring Batch и RepositoryItemWriter / RepositoryItemReader - PullRequest
0 голосов
/ 22 октября 2019

Я пытаюсь написать приложение пакетной обработки, используя пружинный пакет с многопоточным шагом. Это простое приложение, считывающее данные из таблицы и записывающее в другую таблицу, но объем данных составляет около 2 миллионов записей.

Я использую RepositoryItemReader & RepositoryItemWriter для чтения и записи данных. Но после обработки некоторых данных происходит сбой из-за невозможности установить соединение JDBC.

//Config.Java

 @Bean
    public TaskExecutor taskExecutor() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(10);
        return taskExecutor;
    }

    @Bean(name = "personJob")
    public Job personKeeperJob() {

        Step step = stepBuilderFactory.get("step-1")
                .<User, Person> chunk(1000)
                .reader(userReader)
                .processor(jpaProcessor)
                .writer(personWriter)
                .taskExecutor(taskExecutor())
                .throttleLimit(10)
                .build();

        Job job = jobBuilderFactory.get("person-job")
                .incrementer(new RunIdIncrementer())
                .listener(this)
                .start(step)
                .build();

        return job;
    }


//Processor.Java

@Override
    public Person process(User user) throws Exception {
        Optional<User> userFromDb = userRepo.findById(user.getUserId());
        Person person = new Person();

        if(userFromDb.isPresent()) {
            person.setName(userFromDb.get().getName());
            person.setUserId(userFromDb.get().getUserId());
            person.setDept(userFromDb.get().getDept());
        }

        return person;
    }


//Reader.Java

@Autowired
    public UserItemReader(final UserRepository repository) {
        super();
        this.repository = repository;
    }
    @PostConstruct
    protected void init() {
        final Map<String, Sort.Direction> sorts = new HashMap<>();
        sorts.put("userId", Direction.ASC);
        this.setRepository(this.repository);
        this.setSort(sorts);
        this.setMethodName("findAll");
    }

//Writer.Java

@PostConstruct
    protected void init() {
        this.setRepository(repository);
    }

    @Transactional
    public void write(List<? extends Person> persons) throws Exception {
        repository.saveAll(persons);
    }


application.properties

# Datasource

spring.datasource.platform=h2
spring.datasource.url=jdbc:h2:mem:batchdb
spring.main.allow-bean-definition-overriding=true
spring.datasource.hikari.maximum-pool-size=500

Error :

org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
    at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:447)
  ......................
Caused by: org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
    at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:48)

............................

Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30927ms.

1 Ответ

0 голосов
/ 23 октября 2019

У вас закончились соединения.

Попробуйте установить пул соединений Hikari на большее число:

spring.datasource.hikari.maximum-pool-size=20
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...