Почему мой многопоточный шаг Spring Batch выполняет все чтения перед любой обработкой? - PullRequest
0 голосов
/ 29 мая 2020

Я пытаюсь написать процесс Spring Batch для преобразования миллионов записей в устаревшей БД с разветвленной схемой в упрощенный формат JSON и публикации этого JSON в GCP PubSub. Чтобы сделать этот процесс как можно более эффективным, я пытаюсь использовать многопоточный шаг Spring-Batch.

Чтобы проверить свой процесс, я начал с малого, с размером страницы и размером блока из 5, ограничение в 20 записей для обработки и пул потоков всего из 1 потока. Я пытаюсь пройти через процесс, чтобы убедиться, что он работает так, как я ожидал, но это не так.

Я ожидал, что настройка моего RepositoryItemReader с размером страницы 5 приведет к тому, что он прочитает только 5 записей из БД, а затем обработает эти записи одним фрагментом из 5 перед чтением следующих 5. Но это не то, что происходит. Вместо этого в журналах, поскольку у меня включен режим hibernate show- sql, я вижу, что считыватель читает ВСЕ 20 записей до начала любой обработки.

Почему мой многопоточный шаг выполняет ВСЕ свое чтение перед выполнением любого обработка? Я неправильно его сконфигурировал? Очевидно, я бы не хотел, чтобы моя работа пыталась загрузить миллионы DTO в память, прежде чем она начнет что-либо обрабатывать ...

Вот как я настроил свою работу:

@Configuration
public class ConversionBatchJobConfig {

    @Bean
    public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:5}") Integer chunkSize) {
        return new SimpleCompletionPolicy(chunkSize);
    }

    @Bean
    @StepScope
    public ItemStreamReader<DbProjection> dbReader(
            MyDomainRepository myDomainRepository,
            @Value("#{jobParameters[pageSize]}") Integer pageSize, //pageSize and chunkSize both 5 for now
            @Value("#{jobParameters[limit]}") Integer limit) { //limit is 40
        RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
        myDomainRepositoryReader.setRepository(myDomainRepository);
        myDomainRepositoryReader.setMethodName("findActiveDbDomains"); //A native query
        myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
            add("ACTIVE");
        }});
        myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
            put("update_date", Sort.Direction.ASC);
        }});
        myDomainRepositoryReader.setPageSize(pageSize);
        myDomainRepositoryReader.setMaxItemCount(limit);
        myDomainRepositoryReader.setSaveState(false);
        return myDomainRepositoryReader;
    }

    @Bean
    @StepScope
    public ItemProcessor<DbProjection, JsonMessage> dataConverter(DataRetrievalService dataRetrievalService) {
        return new DbProjectionToJsonMessageConverter(dataRetrievalService);
    }

    @Bean
    @StepScope
    public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
        return new JsonMessageWriter(publisherService);
    }

    @Bean
    public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
                                  ItemStreamReader<DbProjection> dbReader,
                                  ItemProcessor<DbProjection, JsonMessage> dataConverter,
                                  ItemWriter<JsonMessage> jsonPublisher,
                                  StepBuilderFactory stepBuilderFactory,
                                  TaskExecutor conversionThreadPool,
                                  @Value("${conversion.failure.limit:20}") int maximumFailures) {
        return stepBuilderFactory.get("conversionProcess")
                .<DbProjection, JsonMessage>chunk(processChunkSize)
                .reader(dbReader)
                .processor(dataConverter)
                .writer(jsonPublisher)
                .faultTolerant()
                .skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
                            //  ^ for now this returns true for everything until 20 failures
                    .listener(new MyConversionSkipListener(processStatus))
                              //  ^ for now this just logs the error
                .taskExecutor(conversionThreadPool)
                .build();
    }

    @Bean
    public Job conversionJob(Step conversionProcess,
                             JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory.get("conversionJob")
                .start(conversionProcess)
                .build();
    }
}

1 Ответ

1 голос
/ 02 июня 2020

Вам необходимо проверить значение hibernate.jdbc.fetch_size и установить его соответствующим образом.

pageSize и fetchSize - это разные параметры. Вы можете найти более подробную информацию о различиях здесь: { ссылка }. Итак, в вашем случае, если fetchSize больше, чем pageSize, возможно, что будет получено больше записей, чем размер страницы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...