Сохраните соединения JDBC с помощью JdbcCursorItemReader или JdbcPagingItemReader - PullRequest
0 голосов
/ 22 марта 2019

В весеннем пакетном проекте я использовал JdbcCursorItemReader для чтения данных для их параллельной обработки. Я могу запустить партию локально без проблем.

Я также слышал, что JdbcPagingItemReader рекомендуется для параллельной обработки с JdbcCursorItemReader, поскольку средство чтения курсоров будет слишком долго удерживать соединение, в то время как устройство чтения пейджинга может разорвать соединение, как только достигнут размер страницы.

Затем я переключился на JdbcPagingItemReader на шаге 2, но из-за удивления я получил исключение ниже при локальном запуске.

Причина: java.sql.SQLTransientConnectionException: HikariPool-1 - Соединение недоступно, время запроса истекло после 300001 мс.

Однако, похоже, что вышеупомянутое исключение происходит на шаге 1 до того, как считыватель пейджинга на шаге 2 будет выполнен, и это единственное сделанное изменение. Пожалуйста, пролите некоторый свет на то, почему возникает исключение, и если это хорошая практика, использовать параллельный просмотрщик страниц вместо курсора. Очень признателен за вашу помощь!

Фрагмент кода вставлен ниже:

@Bean
@StepScope
public Flow createParallelSubFlow() {
    List<Flow> subFlowList = new ArrayList<>();

    List<Stream> streamList;

    try {
        streamList = dataSourceConfig.streamMapper().
                getStreamListByStatus(Constants.PENDING_STATUS_CD);
    } catch (Exception e) {
    }
    streamList.forEach(stream -> {
        long id = stream.getStreamId();
        String flowName = "stream" + id + "_flow";
        Flow subFlow = new FlowBuilder<Flow>(flowName)
                .start(step1(id))
                .next(step2(id))
                .end();

        subFlowList.add(subFlow);
    });

    return new FlowBuilder<Flow>("splitFlow").split(new SimpleAsyncTaskExecutor())
            .add(subFlowList.toArray(new Flow[0])).build();

}

public Step step1(long id) {
    return stepBuilderFactory.get("step1")
            .<Domain, Domain>chunk(100)
            .reader(reader1(id))
            .writer(writer1())
            .build();
}

//@StepScope
//@Bean
public Step step2(long id) {

    return stepBuilderFactory.get("step2")
            .<Domain, Domain>chunk(100)
            .reader(cursorReader2(id))
            .processor(processor2)
            .writer(writer2())
            .build();
}


public JdbcCursorItemReader<Domain> cursorReader2(Long id) {

    return new JdbcCursorItemReaderBuilder<Domain>()
            .dataSource(dataSourceConfig.dataSource())
            .name("cursorReader")
            .sql(Constants.QUERY_SQL)
            .preparedStatementSetter(new PreparedStatementSetter() {

                @Override
                public void setValues(PreparedStatement ps) throws SQLException {
                    ps.setLong(1, id);
                }})
            .rowMapper(new RowMapper())
            .build();
}


//Switch from cursorReader2 to pagingReader2 in step2
public JdbcPagingItemReader<Domain> pagingReader2(Long id) {

    return new JdbcPagingItemReaderBuilder<Domain>()
            .dataSource(dataSourceConfig.dataSource())
            .name("pagingReader")
            .queryProvider(queryProvider())
            .parameterValues(parameterValues(id))                
            .rowMapper(new RowMapper())
            .pageSize(100)
            .build();
}

@Bean
public PagingQueryProvider queryProvider() {
    SqlPagingQueryProviderFactoryBean providerFactory = new SqlPagingQueryProviderFactoryBean();

    Map<String, Order> sortKeys = new HashMap<>(2);
    sortKeys.put("ID", Order.ASCENDING);

    providerFactory.setDataSource(dataSourceConfig.dataSource());
    providerFactory.setSelectClause("SELECT Clause");
    providerFactory.setFromClause("FROM Clause");
    providerFactory.setWhereClause("WHERE Clause");
    providerFactory.setSortKeys(sortKeys);

    PagingQueryProvider pagingQueryProvider = null;
    try {
        pagingQueryProvider = providerFactory.getObject();
    } catch (Exception e) {
        logger.error("Failed to get PagingQueryProvider", e);
        throw new RuntimeException("Failed to get PagingQueryProvider", e);
    }

    return pagingQueryProvider;
}

private Map<String, Object> parameterValues(Long id) {

    Map<String, Object> parameterValues = new HashMap<>();
    parameterValues.put("1", id);

    return parameterValues;

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...