Я написал весеннюю пакетную программу для создания списка подпотоков. Количество подпотоков будет динамически определяться во время выполнения. Подпотоки будут работать параллельно. Пример кода вставлен ниже:
@Configuration
public class JobConfiguration {
@Bean
public Job myJob() {
return jobBuilderFactory.get("myJob")
.preventRestart()
.incrementer(new RunIdIncrementer())
.start(step1())
.next(splitStep())
.build();
}
@Bean
public Step splitStep() {
return stepBuilderFactory.get("splitStep")
.flow(createSubFlow())
.build();
}
@Bean
@StepScope
public Flow createSubFlow() {
List<Flow> streamSubFlowList = new ArrayList<>();
List<Stream> streamList;
try {
streamList = dataSourceConfig.streamMapper().getStreamList();
} catch (Exception e) {
logger.error("getStreamList failed for", e);
}
streamList.forEach(stream -> {
long streamId = stream.getStreamId();
String streamFlowName = "stream" + streamId + "_flow";
Flow streamSubFlow = new FlowBuilder<Flow>(streamFlowName)
.start(step2(streamId))
.next(step3(streamId))
.end();
logger.info("streamSubFlow is created for streamId: {}", streamId);
streamSubFlowList.add(streamSubFlow);
});
return new FlowBuilder<Flow>("splitStreamFlow").split(new SimpleAsyncTaskExecutor())
.add(streamSubFlowList.toArray(new Flow[0])).build();
}
public Step step2(long streamId) {
return stepBuilderFactory.get("step2")
.<Slot, Slot>chunk(1)
.reader(reader2(streamId))
.processor(processor2)
.writer(writer2())
.build();
}
public Step step3(long streamId) {
return stepBuilderFactory.get("step3")
.<Group, Group>chunk(1)
.reader(reader3(streamId))
.processor(processor3)
.writer(writer3())
.build();
}
public JdbcCursorItemReader<Slot> reader2(Long streamId) {
return new JdbcCursorItemReaderBuilder<Slot>()
.dataSource(dataSourceConfig.dataSource())
.name("reader2")
.sql(Constants.QUERY2_SQL)
.preparedStatementSetter(preparedStatementSetter(streamId))
.rowMapper(new reader2RowMapper())
.build();
}
public JdbcCursorItemReader<Slot> reader3(Long streamId) {
return new JdbcCursorItemReaderBuilder<Group>()
.dataSource(dataSourceConfig.dataSource())
.name("reader3")
.sql(Constants.QUERY3_SQL)
.preparedStatementSetter(preparedStatementSetter(streamId))
.rowMapper(new reader3RowMapper())
.build();
}
@Bean
public ItemWriter<Slot> writer2 () {
JdbcBatchItemWriter<ForecastSlot> writer2 = new JdbcBatchItemWriter<>();
writer2.setDataSource(dataSourceConfig.dataSource());
writer2.setSql(Constants.INSERT_SLOT_SQL);
writer2.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Slot>());
return writer2;
}
@Bean
public ItemWriter<Group> writer3 () {
JdbcBatchItemWriter<ForecastSkillGroup> writer3 = new JdbcBatchItemWriter<>();
writer3.setDataSource(dataSourceConfig.dataSource());
writer3.setSql(Constants.INSERT_GROUP_SQL);
writer3.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Group>());
return writer3;
}
}
После запуска пакета он успешно завершен. Кажется, 3 подпотока были созданы и работают параллельно. Тем не менее, база данных не обновляется. Я устанавливаю точки останова в rowmapper, но он не срабатывает. Я убедился, что в базе данных есть данные, поэтому нужно вызвать rowmapper.
2018-Oct-26 20:52:27.985 INFO [main] c.t.w.p.b.c. - streamSubFlow is created for streamId: 3
2018-Oct-26 20:52:28.209 INFO [main] c.t.w.p.b.c. - streamSubFlow is created for streamId: 4
2018-Oct-26 20:52:28.434 INFO [main] c.t.w.p.b.c. - streamSubFlow is created for streamId: 5
2018-Oct-26 20:52:42.835 INFO [SimpleAsyncTaskExecutor-4] o.s.b.c.j.SimpleStepHandler - Executing step: [step2]
2018-Oct-26 20:52:42.951 INFO [SimpleAsyncTaskExecutor-17] o.s.b.c.j.SimpleStepHandler - Executing step: [step2]
2018-Oct-26 20:52:43.097 INFO [SimpleAsyncTaskExecutor-7] o.s.b.c.j.SimpleStepHandler - Executing step: [step2]
2018-Oct-26 20:52:58.989 INFO [SimpleAsyncTaskExecutor-17] o.s.b.c.j.SimpleStepHandler - Executing step: [step3]
2018-Oct-26 20:52:59.010 INFO [SimpleAsyncTaskExecutor-4] o.s.b.c.j.SimpleStepHandler - Executing step: [step3]
2018-Oct-26 20:52:59.142 INFO [SimpleAsyncTaskExecutor-38] o.s.b.c.j.SimpleStepHandler - Executing step: [step3]
Любая идея, почему подпотоки не обновили базу данных. Заранее спасибо!