В весеннем пакетном проекте я использовал JdbcCursorItemReader для чтения данных для их параллельной обработки. Я могу запустить партию локально без проблем.
Я также слышал, что JdbcPagingItemReader рекомендуется для параллельной обработки с JdbcCursorItemReader, поскольку средство чтения курсоров будет слишком долго удерживать соединение, в то время как устройство чтения пейджинга может разорвать соединение, как только достигнут размер страницы.
Затем я переключился на JdbcPagingItemReader на шаге 2, но из-за удивления я получил исключение ниже при локальном запуске.
Причина: java.sql.SQLTransientConnectionException: HikariPool-1 -
Соединение недоступно, время запроса истекло после 300001 мс.
Однако, похоже, что вышеупомянутое исключение происходит на шаге 1 до того, как считыватель пейджинга на шаге 2 будет выполнен, и это единственное сделанное изменение. Пожалуйста, пролите некоторый свет на то, почему возникает исключение, и если это хорошая практика, использовать параллельный просмотрщик страниц вместо курсора. Очень признателен за вашу помощь!
Фрагмент кода вставлен ниже:
@Bean
@StepScope
public Flow createParallelSubFlow() {
List<Flow> subFlowList = new ArrayList<>();
List<Stream> streamList;
try {
streamList = dataSourceConfig.streamMapper().
getStreamListByStatus(Constants.PENDING_STATUS_CD);
} catch (Exception e) {
}
streamList.forEach(stream -> {
long id = stream.getStreamId();
String flowName = "stream" + id + "_flow";
Flow subFlow = new FlowBuilder<Flow>(flowName)
.start(step1(id))
.next(step2(id))
.end();
subFlowList.add(subFlow);
});
return new FlowBuilder<Flow>("splitFlow").split(new SimpleAsyncTaskExecutor())
.add(subFlowList.toArray(new Flow[0])).build();
}
public Step step1(long id) {
return stepBuilderFactory.get("step1")
.<Domain, Domain>chunk(100)
.reader(reader1(id))
.writer(writer1())
.build();
}
//@StepScope
//@Bean
public Step step2(long id) {
return stepBuilderFactory.get("step2")
.<Domain, Domain>chunk(100)
.reader(cursorReader2(id))
.processor(processor2)
.writer(writer2())
.build();
}
public JdbcCursorItemReader<Domain> cursorReader2(Long id) {
return new JdbcCursorItemReaderBuilder<Domain>()
.dataSource(dataSourceConfig.dataSource())
.name("cursorReader")
.sql(Constants.QUERY_SQL)
.preparedStatementSetter(new PreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps) throws SQLException {
ps.setLong(1, id);
}})
.rowMapper(new RowMapper())
.build();
}
//Switch from cursorReader2 to pagingReader2 in step2
public JdbcPagingItemReader<Domain> pagingReader2(Long id) {
return new JdbcPagingItemReaderBuilder<Domain>()
.dataSource(dataSourceConfig.dataSource())
.name("pagingReader")
.queryProvider(queryProvider())
.parameterValues(parameterValues(id))
.rowMapper(new RowMapper())
.pageSize(100)
.build();
}
@Bean
public PagingQueryProvider queryProvider() {
SqlPagingQueryProviderFactoryBean providerFactory = new SqlPagingQueryProviderFactoryBean();
Map<String, Order> sortKeys = new HashMap<>(2);
sortKeys.put("ID", Order.ASCENDING);
providerFactory.setDataSource(dataSourceConfig.dataSource());
providerFactory.setSelectClause("SELECT Clause");
providerFactory.setFromClause("FROM Clause");
providerFactory.setWhereClause("WHERE Clause");
providerFactory.setSortKeys(sortKeys);
PagingQueryProvider pagingQueryProvider = null;
try {
pagingQueryProvider = providerFactory.getObject();
} catch (Exception e) {
logger.error("Failed to get PagingQueryProvider", e);
throw new RuntimeException("Failed to get PagingQueryProvider", e);
}
return pagingQueryProvider;
}
private Map<String, Object> parameterValues(Long id) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("1", id);
return parameterValues;
}