Spring Batch Reader читает альтернативные записи - PullRequest
1 голос
/ 03 мая 2020

Я создал пример весеннего пакетного приложения, которое пытается прочитать записи из БД и в программе записи отображает эти записи. Однако я мог видеть, что печатаются только четные (альтернативные) записи.

Это не проблема базы данных, поскольку поведение согласуется как с базой данных H2, так и с базой данных Oracle.

Там Всего 100 записей в моей БД.

С JDBCCursorItemReader считываются только 50 записей, которые чередуются с одной, что видно из снимка журнала

enter image description here

С JdbcPagingItemReader, только 5 записей читаются, и они тоже чередуются, что видно из снимка журнала enter image description here

Мои настройки кода приведены ниже. Почему читатель пропускает записи с нечетными номерами?

@Bean
public ItemWriter<Safety> safetyWriter() {
    return items -> {
        for (Safety item : items) {
            log.info(item.toString());
        }
    };
}

@Bean
public JdbcCursorItemReader<Safety> cursorItemReader() throws Exception {
    JdbcCursorItemReader<Safety> reader = new JdbcCursorItemReader<>();

    reader.setSql("select * from safety " );
    reader.setDataSource(dataSource);
    reader.setRowMapper(new SafetyRowMapper());
    reader.setVerifyCursorPosition(false);
    reader.afterPropertiesSet();

    return reader;
}

@Bean
    JdbcPagingItemReader<Safety> safetyPagingItemReader() throws Exception {
        JdbcPagingItemReader<Safety> reader = new JdbcPagingItemReader<>();

        reader.setDataSource(dataSource);
        reader.setFetchSize(10);
        reader.setRowMapper(new SafetyRowMapper());

        H2PagingQueryProvider queryProvider = new H2PagingQueryProvider();
        queryProvider.setSelectClause("*");
        queryProvider.setFromClause("safety");

        Map<String, Order> sortKeys = new HashMap<>(1);

        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);

        reader.setQueryProvider(queryProvider);

        return reader;
    }

@Bean
public Step importSafetyDetails() throws Exception {
    return stepBuilderFactory.get("importSafetyDetails")
            .<Safety, Safety>chunk(chunkSize)
            //.reader(cursorItemReader())
            .reader(safetyPagingItemReader())
            .writer(safetyWriter())
            .listener(new StepListener())
            .listener(new ChunkListener())
            .build();
}

@Bean
public Job job() throws Exception {
    return jobBuilderFactory.get("job")
            .start(importSafetyDetails())
            .build();
}

Классы домена выглядят следующим образом:

@NoArgsConstructor
@AllArgsConstructor
@Data
public class Safety {    
    private int id;
}


public class SafetyRowMapper implements RowMapper<Safety> {

    @Override
    public Safety mapRow(ResultSet resultSet, int i) throws SQLException {
        if(resultSet.next()) {
            Safety safety = new Safety();
            safety.setId(resultSet.getInt("id"));
            return safety;
        }
        return null;
    }
}

@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchSamplesApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchSamplesApplication.class, args);
    }

}

application.yml конфигурируется следующим образом:

spring:
  application:
    name: spring-batch-samples
  main:
    allow-bean-definition-overriding: true
  datasource:
    url: jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
    username: sa
    password:
    driver-class-name: org.h2.Driver
    hikari:
      connection-timeout: 20000
      maximum-pool-size: 10
  h2:
    console:
      enabled: true
  batch:
    initialize-schema: never

server:
  port: 9090

sqls, как показано ниже:

CREATE TABLE safety (
  id int NOT NULL,
  CONSTRAINT PK_ID PRIMARY KEY (id)
);

INSERT INTO safety (id) VALUES (1);
...100 records are inserted

Классы слушателей, как показано ниже:

@Slf4j
public class StepListener{

    @AfterStep
    public ExitStatus afterStep(StepExecution stepExecution) {
        log.info("In step {} ,Exit Status: {} ,Read Records: {} ,Committed Records: {} ,Skipped Read Records: {} ,Skipped Write Records: {}",
                stepExecution.getStepName(),
                stepExecution.getExitStatus().getExitCode(),
                stepExecution.getReadCount(),
                stepExecution.getCommitCount(),
                stepExecution.getReadSkipCount(),
                stepExecution.getWriteSkipCount());
        return stepExecution.getExitStatus();
    }
}

@Slf4j
public class ChunkListener {

    @BeforeChunk
    public void beforeChunk(ChunkContext context) {
        log.info("<< Before the chunk");
    }

    @AfterChunk
    public void afterChunk(ChunkContext context) {
        log.info("<< After the chunk");
    }

}

1 Ответ

0 голосов
/ 03 мая 2020

Я пытался воспроизвести вашу проблему, но не смог. Возможно, было бы замечательно, если бы вы могли поделиться большим количеством кода.

Тем временем я создал простую работу, чтобы прочитать 100 записей из таблицы «безопасности» и распечатать их на консоли. И это работает нормально .

See ScreenShot.

@SpringBootApplication
@EnableBatchProcessing
public class ReaderWriterProblem implements CommandLineRunner {

    @Autowired
    DataSource dataSource;

    @Autowired
    StepBuilderFactory stepBuilderFactory;

    @Autowired
    JobBuilderFactory jobBuilderFactory;

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private ApplicationContext context;

    public static void main(String[] args) {
        String[] arguments = new String[]{LocalDateTime.now().toString()};
        SpringApplication.run(ReaderWriterProblem.class, arguments);
    }

    @Bean
    public ItemWriter<Safety> safetyWriter() {
        return new ItemWriter<Safety>() {
            @Override
            public void write(List<? extends Safety> items) throws Exception {

                for (Safety item : items) {
                    //log.info(item.toString());
                    System.out.println(item);
                }

            }
        };
    }

    //    @Bean
    //    public JdbcCursorItemReader<Safety> cursorItemReader() throws Exception {
    //        JdbcCursorItemReader<Safety> reader = new JdbcCursorItemReader<>();
    //
    //        reader.setSql("select * from safety ");
    //        reader.setDataSource(dataSource);
    //        reader.setRowMapper(new SafetyRowMapper());
    //        reader.setVerifyCursorPosition(false);
    //        reader.afterPropertiesSet();
    //
    //        return reader;
    //    }

    @Bean
    JdbcPagingItemReader<Safety> safetyPagingItemReader() throws Exception {
        JdbcPagingItemReader<Safety> reader = new JdbcPagingItemReader<>();

        reader.setDataSource(dataSource);
        reader.setFetchSize(10);
        reader.setRowMapper(new SafetyRowMapper());

        PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
        queryProvider.setSelectClause("*");
        queryProvider.setFromClause("safety");

        Map<String, Order> sortKeys = new HashMap<>(1);

        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);

        reader.setQueryProvider(queryProvider);

        return reader;
    }

    @Bean
    public Step importSafetyDetails() throws Exception {
        return stepBuilderFactory.get("importSafetyDetails")
                .<Safety, Safety>chunk(5)
                //.reader(cursorItemReader())
                .reader(safetyPagingItemReader())
                .writer(safetyWriter())
                .listener(new MyStepListener())
                .listener(new MyChunkListener())
                .build();
    }

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("job")
                .listener(new JobListener())
                .start(importSafetyDetails())
                .build();
    }

    @Override
    public void run(String... args) throws Exception {
        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();

        jobParametersBuilder.addString("date", LocalDateTime.now().toString());

        try {

            Job job = (Job) context.getBean("job");

            jobLauncher.run(job, jobParametersBuilder.toJobParameters());

        } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
            e.printStackTrace();
        }
    }

    public static class JobListener implements JobExecutionListener {

        @Override
        public void beforeJob(JobExecution jobExecution) {
            System.out.println("Before job");
        }

        @Override
        public void afterJob(JobExecution jobExecution) {
            System.out.println("After job");
        }

    }

    private static class SafetyRowMapper implements RowMapper<Safety> {
        @Override
        public Safety mapRow(ResultSet resultSet, int i) throws SQLException {
            Safety safety = new Safety();
            safety.setId(resultSet.getLong("ID"));
            return safety;
        }
    }

    public static class MyStepListener implements StepExecutionListener {

        @Override
        public void beforeStep(StepExecution stepExecution) {
            System.out.println("Before Step");
        }

        @Override
        public ExitStatus afterStep(StepExecution stepExecution) {
            System.out.println("After Step");
            return ExitStatus.COMPLETED;
        }
    }

    private static class MyChunkListener implements ChunkListener {

        @Override
        public void beforeChunk(ChunkContext context) {
            System.out.println("Before Chunk");
        }

        @Override
        public void afterChunk(ChunkContext context) {
            System.out.println("After Chunk");
        }

        @Override
        public void afterChunkError(ChunkContext context) {

        }
    }
}

Надеюсь, это поможет

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...