список параллельных подпотоков фактически не выполняется в весенней партии - PullRequest
0 голосов
/ 27 октября 2018

Я написал весеннюю пакетную программу для создания списка подпотоков. Количество подпотоков будет динамически определяться во время выполнения. Подпотоки будут работать параллельно. Пример кода вставлен ниже:

@Configuration
public class JobConfiguration {

@Bean
public Job myJob() {

    return jobBuilderFactory.get("myJob")
            .preventRestart()
            .incrementer(new RunIdIncrementer())
            .start(step1())
            .next(splitStep())
            .build();
}

@Bean
public Step splitStep() {

    return stepBuilderFactory.get("splitStep")
            .flow(createSubFlow())
            .build();
}

@Bean
@StepScope
public Flow createSubFlow() {
    List<Flow> streamSubFlowList = new ArrayList<>();

    List<Stream> streamList;

    try {
        streamList = dataSourceConfig.streamMapper().getStreamList();
    } catch (Exception e) {
        logger.error("getStreamList failed for", e);
    }
    streamList.forEach(stream -> {
        long streamId = stream.getStreamId();
        String streamFlowName = "stream" + streamId + "_flow";
        Flow streamSubFlow = new FlowBuilder<Flow>(streamFlowName)
                .start(step2(streamId))
                .next(step3(streamId))
                .end();

        logger.info("streamSubFlow is created for streamId: {}", streamId);

        streamSubFlowList.add(streamSubFlow);
    });

    return new FlowBuilder<Flow>("splitStreamFlow").split(new SimpleAsyncTaskExecutor())
            .add(streamSubFlowList.toArray(new Flow[0])).build();
}



public Step step2(long streamId) {
    return stepBuilderFactory.get("step2")
            .<Slot, Slot>chunk(1)
            .reader(reader2(streamId))
            .processor(processor2)
            .writer(writer2())
            .build();
}


public Step step3(long streamId) {
    return stepBuilderFactory.get("step3")
            .<Group, Group>chunk(1)
            .reader(reader3(streamId))
            .processor(processor3)
            .writer(writer3())
            .build();
}


public JdbcCursorItemReader<Slot> reader2(Long streamId) {
        return new JdbcCursorItemReaderBuilder<Slot>()
                        .dataSource(dataSourceConfig.dataSource())
                        .name("reader2")
                        .sql(Constants.QUERY2_SQL)
                        .preparedStatementSetter(preparedStatementSetter(streamId))
                        .rowMapper(new reader2RowMapper())
                        .build();
}

public JdbcCursorItemReader<Slot> reader3(Long streamId) {
        return new JdbcCursorItemReaderBuilder<Group>()
                        .dataSource(dataSourceConfig.dataSource())
                        .name("reader3")
                        .sql(Constants.QUERY3_SQL)
                        .preparedStatementSetter(preparedStatementSetter(streamId))
                        .rowMapper(new reader3RowMapper())
                        .build();
}


@Bean
public ItemWriter<Slot> writer2 () {

    JdbcBatchItemWriter<ForecastSlot> writer2 = new JdbcBatchItemWriter<>();
    writer2.setDataSource(dataSourceConfig.dataSource());
    writer2.setSql(Constants.INSERT_SLOT_SQL);
    writer2.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Slot>());

    return writer2;
}

@Bean
public ItemWriter<Group> writer3 () {

    JdbcBatchItemWriter<ForecastSkillGroup> writer3 = new JdbcBatchItemWriter<>();
    writer3.setDataSource(dataSourceConfig.dataSource());
    writer3.setSql(Constants.INSERT_GROUP_SQL);
    writer3.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Group>());

    return writer3;
}

}

После запуска пакета он успешно завершен. Кажется, 3 подпотока были созданы и работают параллельно. Тем не менее, база данных не обновляется. Я устанавливаю точки останова в rowmapper, но он не срабатывает. Я убедился, что в базе данных есть данные, поэтому нужно вызвать rowmapper.

2018-Oct-26 20:52:27.985 INFO  [main] c.t.w.p.b.c. - streamSubFlow is created for streamId: 3

2018-Oct-26 20:52:28.209 INFO  [main] c.t.w.p.b.c. - streamSubFlow is created for streamId: 4

2018-Oct-26 20:52:28.434 INFO  [main] c.t.w.p.b.c. - streamSubFlow is created for streamId: 5

2018-Oct-26 20:52:42.835 INFO  [SimpleAsyncTaskExecutor-4] o.s.b.c.j.SimpleStepHandler - Executing step: [step2]

2018-Oct-26 20:52:42.951 INFO  [SimpleAsyncTaskExecutor-17] o.s.b.c.j.SimpleStepHandler - Executing step: [step2]

2018-Oct-26 20:52:43.097 INFO  [SimpleAsyncTaskExecutor-7] o.s.b.c.j.SimpleStepHandler - Executing step: [step2]

2018-Oct-26 20:52:58.989 INFO  [SimpleAsyncTaskExecutor-17] o.s.b.c.j.SimpleStepHandler - Executing step: [step3]

2018-Oct-26 20:52:59.010 INFO  [SimpleAsyncTaskExecutor-4] o.s.b.c.j.SimpleStepHandler - Executing step: [step3]

2018-Oct-26 20:52:59.142 INFO  [SimpleAsyncTaskExecutor-38] o.s.b.c.j.SimpleStepHandler - Executing step: [step3]

Любая идея, почему подпотоки не обновили базу данных. Заранее спасибо!

...