Разделы Spring Batch не работают должным образом - PullRequest
0 голосов
/ 05 мая 2020

Я работаю над примером Spring Boot и Spring Batch , взяв ссылку из: https://mkyong.com/spring-batch/spring-batch-partitioning-example/.

В этом примере я преобразовал код в на основе аннотации, но она не работает. Его единственное сохранение записей в файле CSV до 40 в блоке из 10. Почему другие не получают записи?

Журналы:

2020-05-06 01:31:39.541  INFO 24180 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.

Starting : Thread1, fromId : 1, toId : 10

Starting : Thread2, fromId : 11, toId : 20

Starting : Thread3, fromId : 21, toId : 30

Starting : Thread4, fromId : 31, toId : 40

Starting : Thread5, fromId : 41, toId : 50

Starting : Thread6, fromId : 51, toId : 60

Starting : Thread7, fromId : 61, toId : 70

Starting : Thread8, fromId : 71, toId : 80

Starting : Thread9, fromId : 81, toId : 90

Starting : Thread10, fromId : 91, toId : 100
2020-05-06 01:31:39.676  WARN 24180 --- [           main] o.s.b.c.c.a.DefaultBatchConfigurer       : No transaction manager was provided, using a DataSourceTransactionManager
2020-05-06 01:31:39.770  INFO 24180 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2020-05-06 01:31:39.927  INFO 24180 --- [           main] .e.SpringBatchPartitionMkyongApplication : Started SpringBatchPartitionMkyongApplication in 2.078 seconds (JVM running for 3.324)
2020-05-06 01:31:39.928  INFO 24180 --- [           main] o.s.b.a.b.JobLauncherCommandLineRunner   : Running default command line with: [--spring.output.ansi.enabled=always]
2020-05-06 01:31:39.975  INFO 24180 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitionJob]] launched with the following parameters: [{date=1588702900000, time=1588702899832, run.id=1, -spring.output.ansi.enabled=always}]
2020-05-06 01:31:40.007  INFO 24180 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]

Starting : Thread1, fromId : 1, toId : 10

Starting : Thread2, fromId : 11, toId : 20

Starting : Thread3, fromId : 21, toId : 30

Starting : Thread4, fromId : 31, toId : 40
2020-05-06 01:31:40.142  INFO 24180 --- [cTaskExecutor-4] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:partition4] executed in 120ms
2020-05-06 01:31:40.142  INFO 24180 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:partition2] executed in 119ms
2020-05-06 01:31:40.142  INFO 24180 --- [cTaskExecutor-3] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:partition3] executed in 120ms
2020-05-06 01:31:40.142  INFO 24180 --- [cTaskExecutor-1] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:partition1] executed in 119ms
2020-05-06 01:31:40.146  INFO 24180 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 139ms
2020-05-06 01:31:40.150  INFO 24180 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitionJob]] completed with the following parameters: [{date=1588702900000, time=1588702899832, run.id=1, -spring.output.ansi.enabled=always}] and the following status: [COMPLETED] in 151ms
2020-05-06 01:31:40.152  INFO 24180 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitionJob]] launched with the following parameters: [{date=1588708900151, time=1588708900151}]
2020-05-06 01:31:40.156  INFO 24180 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]

Starting : Thread1, fromId : 1, toId : 10

Starting : Thread2, fromId : 11, toId : 20

Starting : Thread3, fromId : 21, toId : 30

Starting : Thread4, fromId : 31, toId : 40
2020-05-06 01:31:40.203  INFO 24180 --- [cTaskExecutor-7] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:partition4] executed in 39ms
2020-05-06 01:31:40.203  INFO 24180 --- [cTaskExecutor-5] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:partition1] executed in 40ms
2020-05-06 01:31:40.203  INFO 24180 --- [cTaskExecutor-8] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:partition3] executed in 39ms
2020-05-06 01:31:40.209  INFO 24180 --- [cTaskExecutor-6] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:partition2] executed in 46ms
2020-05-06 01:31:40.211  INFO 24180 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 55ms
2020-05-06 01:31:40.213  INFO 24180 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitionJob]] completed with the following parameters: [{date=1588708900151, time=1588708900151}] and the following status: [COMPLETED] in 60ms
STATUS :: COMPLETED
2020-05-06 01:31:40.215  INFO 24180 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2020-05-06 01:31:40.219  INFO 24180 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

JobConfig. java

@Configuration
public class JobConfig {

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Bean
    @StepScope
    public JdbcPagingItemReader<User> userJdbcPagingItemReader(
            @Value("#{stepExecutionContext[fromId]}") Integer fromId,
            @Value("#{stepExecutionContext[toId]}") Integer toId) {

        // reading database records using JDBC in a paging fashion. 
        JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(this.dataSource);
        reader.setFetchSize(10);
        reader.setRowMapper(new UserRowMapper());

        Map<String, Order> sortKeys = new HashMap<>();
        sortKeys.put("id", Order.ASCENDING);

        // MySQL implementation of a PagingQueryProvider using database specific features.
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("select id, username, password, age");
        queryProvider.setFromClause("from user");
        queryProvider.setWhereClause("where id >= :fromId and id <= :toId");
        queryProvider.setSortKeys(sortKeys);

        // Parameter Values
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("fromId", fromId);
        parameterValues.put("toId", toId);

        reader.setQueryProvider(queryProvider);
        reader.setParameterValues(parameterValues);
        return reader;
    }

    @Bean
    @StepScope
    public FlatFileItemWriter<User> fileItemWriter(
            @Value("#{stepExecutionContext[fromId]}") Integer fromId,
            @Value("#{stepExecutionContext[toId]}") Integer toId) throws Exception {

        FlatFileItemWriter<User> writer = new FlatFileItemWriter<>();
        writer.setResource(new FileSystemResource("csv/outputs/users.processed" + fromId + "-" + toId));
        writer.setAppendAllowed(false);
        writer.setLineAggregator(new UserLineAggregator()); 
        writer.afterPropertiesSet();
        return writer;
    }

    @Bean
    public Partitioner partioner() {
        Partitioner partitioner = new RangePartitioner();
        partitioner.partition(10);
        return partitioner;
    }

    // Master
    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .partitioner(slaveStep().getName(), partioner())
                .step(slaveStep())
                .gridSize(4)
                .taskExecutor(new SimpleAsyncTaskExecutor())
                .build();
    }

    // slave step
    @Bean
    public Step slaveStep() throws Exception  {
        return stepBuilderFactory.get("slaveStep")
                .<User, User>chunk(10)
                .reader(userJdbcPagingItemReader(null, null))
                .writer(fileItemWriter(null, null))
                .build();
    }


    @Bean(name="partitionJob")
    public Job job() throws Exception {
        return jobBuilderFactory.get("partitionJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }
}

RangePartitioner. java @Component publi c класс RangePartitioner реализует Partitioner {

@Override
public Map<String, ExecutionContext> partition(int gridSize) {
    Map<String, ExecutionC

ontext> result = new HashMap<>();

        int range = 10;
        int fromId = 1;
        int toId = range;

        for (int i = 1; i <= gridSize; i++) {
            ExecutionContext value = new ExecutionContext();

            System.out.println("\nStarting : Thread" + i +", fromId : " + fromId+", toId : " + toId);

            value.putInt("fromId", fromId);
            value.putInt("toId", toId);

            // give each thread a name
            value.putString("name", "Thread" + i);

            result.put("partition" + i, value);

            fromId = toId + 1;
            toId += range;

        }
        return result;
    }
}
...