Я пытаюсь написать процесс Spring Batch для преобразования миллионов записей в устаревшей БД с разветвленной схемой в упрощенный формат JSON и публикации этого JSON в GCP PubSub. Чтобы сделать этот процесс как можно более эффективным, я пытаюсь использовать многопоточный шаг Spring-Batch.
Чтобы проверить свой процесс, я начал с малого, с размером страницы и размером блока из 5, ограничение в 20 записей для обработки и пул потоков всего из 1 потока. Я пытаюсь пройти через процесс, чтобы убедиться, что он работает так, как я ожидал, но это не так.
Я ожидал, что настройка моего RepositoryItemReader с размером страницы 5 приведет к тому, что он прочитает только 5 записей из БД, а затем обработает эти записи одним фрагментом из 5 перед чтением следующих 5. Но это не то, что происходит. Вместо этого в журналах, поскольку у меня включен режим hibernate show- sql, я вижу, что считыватель читает ВСЕ 20 записей до начала любой обработки.
Почему мой многопоточный шаг выполняет ВСЕ свое чтение перед выполнением любого обработка? Я неправильно его сконфигурировал? Очевидно, я бы не хотел, чтобы моя работа пыталась загрузить миллионы DTO в память, прежде чем она начнет что-либо обрабатывать ...
Вот как я настроил свою работу:
@Configuration
public class ConversionBatchJobConfig {
@Bean
public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:5}") Integer chunkSize) {
return new SimpleCompletionPolicy(chunkSize);
}
@Bean
@StepScope
public ItemStreamReader<DbProjection> dbReader(
MyDomainRepository myDomainRepository,
@Value("#{jobParameters[pageSize]}") Integer pageSize, //pageSize and chunkSize both 5 for now
@Value("#{jobParameters[limit]}") Integer limit) { //limit is 40
RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
myDomainRepositoryReader.setRepository(myDomainRepository);
myDomainRepositoryReader.setMethodName("findActiveDbDomains"); //A native query
myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
add("ACTIVE");
}});
myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
put("update_date", Sort.Direction.ASC);
}});
myDomainRepositoryReader.setPageSize(pageSize);
myDomainRepositoryReader.setMaxItemCount(limit);
myDomainRepositoryReader.setSaveState(false);
return myDomainRepositoryReader;
}
@Bean
@StepScope
public ItemProcessor<DbProjection, JsonMessage> dataConverter(DataRetrievalService dataRetrievalService) {
return new DbProjectionToJsonMessageConverter(dataRetrievalService);
}
@Bean
@StepScope
public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
return new JsonMessageWriter(publisherService);
}
@Bean
public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
ItemStreamReader<DbProjection> dbReader,
ItemProcessor<DbProjection, JsonMessage> dataConverter,
ItemWriter<JsonMessage> jsonPublisher,
StepBuilderFactory stepBuilderFactory,
TaskExecutor conversionThreadPool,
@Value("${conversion.failure.limit:20}") int maximumFailures) {
return stepBuilderFactory.get("conversionProcess")
.<DbProjection, JsonMessage>chunk(processChunkSize)
.reader(dbReader)
.processor(dataConverter)
.writer(jsonPublisher)
.faultTolerant()
.skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
// ^ for now this returns true for everything until 20 failures
.listener(new MyConversionSkipListener(processStatus))
// ^ for now this just logs the error
.taskExecutor(conversionThreadPool)
.build();
}
@Bean
public Job conversionJob(Step conversionProcess,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("conversionJob")
.start(conversionProcess)
.build();
}
}