Как я могу ограничить общее количество записей базы данных, считываемых RepositoryItemReader в моем многопоточном пошаговом процессе Spring Batch? - PullRequest
0 голосов
/ 30 мая 2020

Мой многопоточный Spring Batch Step работает почти беспорядочно. Я не смог распознать какую-либо закономерность в ее неудачах. Иногда он читает и записывает слишком много записей из базы данных, а иногда недостаточно.

Я использую RepositoryItemReader для выполнения собственного собственного запроса. Я определил для него countQuery и использовал метод читателя setMaxItemCount(totalLimit), но, похоже, он считает это скорее предложением, чем фактическим жестким максимумом. Поскольку при количестве потоков 4 и только 1 намеренно плохая запись, вызывающая 1 пропуск в логе процессора c, я видел ...

limit | pageSize | chunkSize || actual writes
100   | 10       | 5         || 110 unique writes
800   | 100      | 25        || 804 unique writes, and 37 duplicate writes (WHY?)
800   | 100      | 25        || 663 unique writes, and 165 duplicate writes (WHYYYY???)

Мой проект использует Spring Boot 2.1. 11.RELEASE, и похоже, что версия инфраструктуры spring-batch-in, которая втягивается, - это 4.1.3.RELEASE. Кто-нибудь знает, почему Spring Batch выполняет слишком много или дублирует записи, когда на одной из страниц происходит всего 1 пропуск?

Может быть, это как-то связано с тем, как я настроил свой JobRepository в памяти ...

Вот мой класс репозитория:

@Repository
public interface MyEntityRepository extends JpaRepository<MyEntity, Integer> {
    String FROM_MY_ENTITY_TABLE_LEFT_JOINED_WITH_ANOTHER_TABLE = "from {h-schema}my_entity e" +
        "left join {h-schema}another_table a" +
        "on e.fk = a.pk ";

    @Query(
        value = "select e.id, e.name, a.additional_info" +
           FROM_MY_ENTITY_TABLE_LEFT_JOINED_WITH_ANOTHER_TABLE +
           "where e.status <> :status and e.add_date < :date",
        countjQuery = "select count(*) " +
           FROM_MY_ENTITY_TABLE_LEFT_JOINED_WITH_ANOTHER_TABLE +
           "where e.status <> :status and e.add_date < :date",
        nativeQuery = true)
    Page<MyProjection> findMyProjectionsWithoutStatusBeforeDate(@Param("status") String status, 
                                                                @Param("date") Date date,
                                                                Pageable page);
}

И вот как Я настроил свою работу:

@Configuration
public class ConversionBatchJobConfig {

    @Bean
    public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:5}") Integer chunkSize) {
        return new SimpleCompletionPolicy(chunkSize);
    }

    @Bean
    @StepScope
    public ItemStreamReader<MyProjection> dbReader(
            MyEntityRepository myEntityRepository,
            @Value("#{jobParameters[startTime]}") Date startTime,
            @Value("#{jobParameters[pageSize]}") Integer pageSize,
            @Value("#{jobParameters[limit]}") Integer limit) {
        RepositoryItemReader<MyProjection> myProjectionRepositoryReader = new RepositoryItemReader<>();
        myProjectionRepositoryReader.setRepository(myEntityRepository);
        myProjectionRepositoryReader.setMethodName("findMyProjectionsWithoutStatusBeforeDate");
        myProjectionRepositoryReader.setArguments(new ArrayList<Object>() {{
            add("REMOVED");
            add(startTime);
        }});
        myProjectionRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
            put("e.id", Sort.Direction.ASC);
        }});
        myProjectionRepositoryReader.setPageSize(pageSize);
        myProjectionRepositoryReader.setMaxItemCount(limit);
        myProjectionRepositoryReader.setSaveState(false);
        return myProjectionRepositoryReader;
    }

    @Bean
    @StepScope
    public ItemProcessor<MyProjection, JsonMessage> dataConverter(AdditionalDbDataRetrievalService dataRetrievalService) {
        return new MyProjectionToJsonMessageConverter(dataRetrievalService); // <== simple ItemProcessor implementation
    }

    @Bean
    @StepScope
    public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
        return new JsonMessageWriter(publisherService); // <== simple ItemWriter implementation
    }

    @Bean
    public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
                                  ItemStreamReader<MyProjection> dbReader,
                                  ItemProcessor<MyProjection, JsonMessage> dataConverter,
                                  ItemWriter<JsonMessage> jsonPublisher,
                                  StepBuilderFactory stepBuilderFactory,
                                  TaskExecutor conversionThreadPool,
                                  @Value("${conversion.failure.limit:20}") int maximumFailures) {
        return stepBuilderFactory.get("conversionProcess")
                .<MyProjection, JsonMessage>chunk(processChunkSize)
                .reader(dbReader)
                .processor(dataConverter)
                .writer(jsonPublisher)
                .faultTolerant()
                .skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
                            //  ^ for now this returns true for everything until 20 failures
                    .listener(new MyConversionSkipListener(processStatus))
                              //  ^ for now this just logs the error
                .taskExecutor(conversionThreadPool)
                .build();
    }

    @Bean
    public Job conversionJob(Step conversionProcess,
                             JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory.get("conversionJob")
                .start(conversionProcess)
                .build();
    }
}

А вот как я настроил свой репозиторий заданий в памяти:

@Configuration
@EnableBatchProcessing
public class InMemoryBatchManagementConfig {

    @Bean
    public ResourcelessTransactionManager resourcelessTransactionManager() {
        ResourcelessTransactionManager resourcelessTransactionManager = new ResourcelessTransactionManager();
        return resourcelessTransactionManager;
    }

    @Bean
    public MapJobRepositoryFactoryBean mapJobRepositoryFactory(ResourcelessTransactionManager resourcelessTransactionManager)
            throws Exception {
        MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean(resourcelessTransactionManager);
        factory.afterPropertiesSet();
        return factory;
    }

    @Bean
    public JobRepository jobRepository(MapJobRepositoryFactoryBean factory) throws Exception {
        return factory.getObject();
    }

    @Bean
    public SimpleJobLauncher jobLauncher(JobRepository jobRepository) throws Exception {
        SimpleJobLauncher launcher = new SimpleJobLauncher();
        launcher.setJobRepository(jobRepository);
        launcher.afterPropertiesSet();
        return launcher;
    }

    @Bean
    public JobExplorer jobExplorer(MapJobRepositoryFactoryBean factory) {
        return new SimpleJobExplorer(factory.getJobInstanceDao(), factory.getJobExecutionDao(),
                factory.getStepExecutionDao(), factory.getExecutionContextDao());
    }

    @Bean
    public BatchConfigurer batchConfigurer(MapJobRepositoryFactoryBean mapJobRepositoryFactory,
                                           ResourcelessTransactionManager resourceslessTransactionManager,
                                           SimpleJobLauncher jobLauncher,
                                           JobExplorer jobExplorer) {
        return new BatchConfigurer() {
            @Override
            public JobRepository getJobRepository() throws Exception {
                return mapJobRepositoryFactory.getObject();
            }

            @Override
            public PlatformTransactionManager getTransactionManager() throws Exception {
                return resourceslessTransactionManager;
            }

            @Override
            public JobLauncher getJobLauncher() throws Exception {
                return jobLauncher;
            }

            @Override
            public JobExplorer getJobExplorer() throws Exception {
                return jobExplorer;
            }
        };
    }
}

EDIT

Удалось заставить Spring Batch работать с базой данных H2 вместо репозитория Map, но я все еще вижу ту же проблему. Вот как я настроил пакет для использования H2:

Я импортировал драйвер H2:

<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>1.4.200</version>
</dependency>

Я настроил свою основную конфигурацию БД, чтобы указывать на мои объекты JPA:

@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(basePackages = "com.company.project.jpa.repository", transactionManagerRef = "transactionManager")
@EntityScan(basePackages = "com.company.project.jpa.entity")
public class DbConfig {

    @Bean
    @Primary
    @ConfigurationProperties("oracle.datasource")
    public DataSource dataSource() {
        return DataSourceBuilder.create().build();
    }

    @Bean
    @Primary
    public LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource,
                                                                       EntityManagerFactoryBuilder builder) {
        return builder.dataSource(dataSource).packages("com.company.project.jpa").build();
    }

    @Bean
    @Primary
    public PlatformTransactionManager transactionManager(
            @Qualifier("entityManagerFactory") LocalContainerEntityManagerFactoryBean entityManagerFactory) {
        return new JpaTransactionManager(entityManagerFactory.getObject());
    }
}

Затем я настроил управление пакетами в памяти следующим образом:

@Configuration
@EnableBatchProcessing
public class InMemoryBatchManagementConfig {

    @Bean(destroyMethod = "shutdown")
    public EmbeddedDatabase h2DataSource() {
        return new EmbeddedDatabaseBuilder().setType(EmbeddedDatabaseType.H2)
                .addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
                .addScript("classpath:org/springframework/batch/core/schema-h2.sql")
                .build();
    }

    @Bean
    public LocalContainerEntityManagerFactoryBean h2EntityManagerFactory(EmbeddedDatabase h2DataSource,
                                                                         EntityManagerFactoryBuilder builder) {
        return builder.dataSource(h2DataSource).packages("org.springframework.batch.core").build();
    }

    @Bean
    public PlatformTransactionManager h2TransactionManager(
            @Qualifier("h2EntityManagerFactory") LocalContainerEntityManagerFactoryBean h2EntityManagerFactory) {
        return new JpaTransactionManager(h2EntityManagerFactory.getObject());
    }

    @Bean
    public JobRepository jobRepository(EmbeddedDatabase h2DataSource,
                                       @Qualifier("h2TransactionManager") PlatformTransactionManager h2TransactionManager) throws Exception {
        final JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDatabaseType(DatabaseType.H2.getProductName());
        factory.setDataSource(h2DataSource);
        factory.setTransactionManager(h2TransactionManager);
        return factory.getObject();
    }

    @Bean
    public SimpleJobLauncher jobLauncher(JobRepository jobRepository) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

    @Bean
    public JobRepositoryFactoryBean jobRepositoryFactoryBean(EmbeddedDatabase h2DataSource,
                                                             @Qualifier("h2TransactionManager") PlatformTransactionManager h2TransactionManager) {
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new  JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(h2DataSource);
        jobRepositoryFactoryBean.setTransactionManager(h2TransactionManager);
        return jobRepositoryFactoryBean;
    }

    @Bean
    public BatchConfigurer batchConfigurer(JobRepository jobRepository,
                                           SimpleJobLauncher jobLauncher,
                                           @Qualifier("h2TransactionManager") PlatformTransactionManager h2TransactionManager,
                                           JobExplorer jobExplorer) {
        return new BatchConfigurer() {
            @Override
            public JobRepository getJobRepository() {
                return jobRepository;
            }

            @Override
            public PlatformTransactionManager getTransactionManager() {
                return h2TransactionManager;
            }

            @Override
            public JobLauncher getJobLauncher() {
                return jobLauncher;
            }

            @Override
            public JobExplorer getJobExplorer() {
                return jobExplorer;
            }
        };
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...