Мой многопоточный Spring Batch Step работает почти беспорядочно. Я не смог распознать какую-либо закономерность в ее неудачах. Иногда он читает и записывает слишком много записей из базы данных, а иногда недостаточно.
Я использую RepositoryItemReader для выполнения собственного собственного запроса. Я определил для него countQuery и использовал метод читателя setMaxItemCount(totalLimit)
, но, похоже, он считает это скорее предложением, чем фактическим жестким максимумом. Поскольку при количестве потоков 4 и только 1 намеренно плохая запись, вызывающая 1 пропуск в логе процессора c, я видел ...
limit | pageSize | chunkSize || actual writes
100 | 10 | 5 || 110 unique writes
800 | 100 | 25 || 804 unique writes, and 37 duplicate writes (WHY?)
800 | 100 | 25 || 663 unique writes, and 165 duplicate writes (WHYYYY???)
Мой проект использует Spring Boot 2.1. 11.RELEASE, и похоже, что версия инфраструктуры spring-batch-in, которая втягивается, - это 4.1.3.RELEASE. Кто-нибудь знает, почему Spring Batch выполняет слишком много или дублирует записи, когда на одной из страниц происходит всего 1 пропуск?
Может быть, это как-то связано с тем, как я настроил свой JobRepository в памяти ...
Вот мой класс репозитория:
@Repository
public interface MyEntityRepository extends JpaRepository<MyEntity, Integer> {
String FROM_MY_ENTITY_TABLE_LEFT_JOINED_WITH_ANOTHER_TABLE = "from {h-schema}my_entity e" +
"left join {h-schema}another_table a" +
"on e.fk = a.pk ";
@Query(
value = "select e.id, e.name, a.additional_info" +
FROM_MY_ENTITY_TABLE_LEFT_JOINED_WITH_ANOTHER_TABLE +
"where e.status <> :status and e.add_date < :date",
countjQuery = "select count(*) " +
FROM_MY_ENTITY_TABLE_LEFT_JOINED_WITH_ANOTHER_TABLE +
"where e.status <> :status and e.add_date < :date",
nativeQuery = true)
Page<MyProjection> findMyProjectionsWithoutStatusBeforeDate(@Param("status") String status,
@Param("date") Date date,
Pageable page);
}
И вот как Я настроил свою работу:
@Configuration
public class ConversionBatchJobConfig {
@Bean
public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:5}") Integer chunkSize) {
return new SimpleCompletionPolicy(chunkSize);
}
@Bean
@StepScope
public ItemStreamReader<MyProjection> dbReader(
MyEntityRepository myEntityRepository,
@Value("#{jobParameters[startTime]}") Date startTime,
@Value("#{jobParameters[pageSize]}") Integer pageSize,
@Value("#{jobParameters[limit]}") Integer limit) {
RepositoryItemReader<MyProjection> myProjectionRepositoryReader = new RepositoryItemReader<>();
myProjectionRepositoryReader.setRepository(myEntityRepository);
myProjectionRepositoryReader.setMethodName("findMyProjectionsWithoutStatusBeforeDate");
myProjectionRepositoryReader.setArguments(new ArrayList<Object>() {{
add("REMOVED");
add(startTime);
}});
myProjectionRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
put("e.id", Sort.Direction.ASC);
}});
myProjectionRepositoryReader.setPageSize(pageSize);
myProjectionRepositoryReader.setMaxItemCount(limit);
myProjectionRepositoryReader.setSaveState(false);
return myProjectionRepositoryReader;
}
@Bean
@StepScope
public ItemProcessor<MyProjection, JsonMessage> dataConverter(AdditionalDbDataRetrievalService dataRetrievalService) {
return new MyProjectionToJsonMessageConverter(dataRetrievalService); // <== simple ItemProcessor implementation
}
@Bean
@StepScope
public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
return new JsonMessageWriter(publisherService); // <== simple ItemWriter implementation
}
@Bean
public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
ItemStreamReader<MyProjection> dbReader,
ItemProcessor<MyProjection, JsonMessage> dataConverter,
ItemWriter<JsonMessage> jsonPublisher,
StepBuilderFactory stepBuilderFactory,
TaskExecutor conversionThreadPool,
@Value("${conversion.failure.limit:20}") int maximumFailures) {
return stepBuilderFactory.get("conversionProcess")
.<MyProjection, JsonMessage>chunk(processChunkSize)
.reader(dbReader)
.processor(dataConverter)
.writer(jsonPublisher)
.faultTolerant()
.skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
// ^ for now this returns true for everything until 20 failures
.listener(new MyConversionSkipListener(processStatus))
// ^ for now this just logs the error
.taskExecutor(conversionThreadPool)
.build();
}
@Bean
public Job conversionJob(Step conversionProcess,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("conversionJob")
.start(conversionProcess)
.build();
}
}
А вот как я настроил свой репозиторий заданий в памяти:
@Configuration
@EnableBatchProcessing
public class InMemoryBatchManagementConfig {
@Bean
public ResourcelessTransactionManager resourcelessTransactionManager() {
ResourcelessTransactionManager resourcelessTransactionManager = new ResourcelessTransactionManager();
return resourcelessTransactionManager;
}
@Bean
public MapJobRepositoryFactoryBean mapJobRepositoryFactory(ResourcelessTransactionManager resourcelessTransactionManager)
throws Exception {
MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean(resourcelessTransactionManager);
factory.afterPropertiesSet();
return factory;
}
@Bean
public JobRepository jobRepository(MapJobRepositoryFactoryBean factory) throws Exception {
return factory.getObject();
}
@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) throws Exception {
SimpleJobLauncher launcher = new SimpleJobLauncher();
launcher.setJobRepository(jobRepository);
launcher.afterPropertiesSet();
return launcher;
}
@Bean
public JobExplorer jobExplorer(MapJobRepositoryFactoryBean factory) {
return new SimpleJobExplorer(factory.getJobInstanceDao(), factory.getJobExecutionDao(),
factory.getStepExecutionDao(), factory.getExecutionContextDao());
}
@Bean
public BatchConfigurer batchConfigurer(MapJobRepositoryFactoryBean mapJobRepositoryFactory,
ResourcelessTransactionManager resourceslessTransactionManager,
SimpleJobLauncher jobLauncher,
JobExplorer jobExplorer) {
return new BatchConfigurer() {
@Override
public JobRepository getJobRepository() throws Exception {
return mapJobRepositoryFactory.getObject();
}
@Override
public PlatformTransactionManager getTransactionManager() throws Exception {
return resourceslessTransactionManager;
}
@Override
public JobLauncher getJobLauncher() throws Exception {
return jobLauncher;
}
@Override
public JobExplorer getJobExplorer() throws Exception {
return jobExplorer;
}
};
}
}
EDIT
Удалось заставить Spring Batch работать с базой данных H2 вместо репозитория Map, но я все еще вижу ту же проблему. Вот как я настроил пакет для использования H2:
Я импортировал драйвер H2:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.200</version>
</dependency>
Я настроил свою основную конфигурацию БД, чтобы указывать на мои объекты JPA:
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(basePackages = "com.company.project.jpa.repository", transactionManagerRef = "transactionManager")
@EntityScan(basePackages = "com.company.project.jpa.entity")
public class DbConfig {
@Bean
@Primary
@ConfigurationProperties("oracle.datasource")
public DataSource dataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@Primary
public LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource,
EntityManagerFactoryBuilder builder) {
return builder.dataSource(dataSource).packages("com.company.project.jpa").build();
}
@Bean
@Primary
public PlatformTransactionManager transactionManager(
@Qualifier("entityManagerFactory") LocalContainerEntityManagerFactoryBean entityManagerFactory) {
return new JpaTransactionManager(entityManagerFactory.getObject());
}
}
Затем я настроил управление пакетами в памяти следующим образом:
@Configuration
@EnableBatchProcessing
public class InMemoryBatchManagementConfig {
@Bean(destroyMethod = "shutdown")
public EmbeddedDatabase h2DataSource() {
return new EmbeddedDatabaseBuilder().setType(EmbeddedDatabaseType.H2)
.addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
.addScript("classpath:org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean
public LocalContainerEntityManagerFactoryBean h2EntityManagerFactory(EmbeddedDatabase h2DataSource,
EntityManagerFactoryBuilder builder) {
return builder.dataSource(h2DataSource).packages("org.springframework.batch.core").build();
}
@Bean
public PlatformTransactionManager h2TransactionManager(
@Qualifier("h2EntityManagerFactory") LocalContainerEntityManagerFactoryBean h2EntityManagerFactory) {
return new JpaTransactionManager(h2EntityManagerFactory.getObject());
}
@Bean
public JobRepository jobRepository(EmbeddedDatabase h2DataSource,
@Qualifier("h2TransactionManager") PlatformTransactionManager h2TransactionManager) throws Exception {
final JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDatabaseType(DatabaseType.H2.getProductName());
factory.setDataSource(h2DataSource);
factory.setTransactionManager(h2TransactionManager);
return factory.getObject();
}
@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public JobRepositoryFactoryBean jobRepositoryFactoryBean(EmbeddedDatabase h2DataSource,
@Qualifier("h2TransactionManager") PlatformTransactionManager h2TransactionManager) {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(h2DataSource);
jobRepositoryFactoryBean.setTransactionManager(h2TransactionManager);
return jobRepositoryFactoryBean;
}
@Bean
public BatchConfigurer batchConfigurer(JobRepository jobRepository,
SimpleJobLauncher jobLauncher,
@Qualifier("h2TransactionManager") PlatformTransactionManager h2TransactionManager,
JobExplorer jobExplorer) {
return new BatchConfigurer() {
@Override
public JobRepository getJobRepository() {
return jobRepository;
}
@Override
public PlatformTransactionManager getTransactionManager() {
return h2TransactionManager;
}
@Override
public JobLauncher getJobLauncher() {
return jobLauncher;
}
@Override
public JobExplorer getJobExplorer() {
return jobExplorer;
}
};
}
}