Весенняя партия JPAItemReader Производительность Проблема - PullRequest
0 голосов
/ 25 января 2019

Ниже приведена конфигурация моего весеннего пакетного задания, которое принимает записи из БД, выполняет некоторую обработку в процессоре элементов, обновляет столбец состояния и записывает обратно в БД.

Когда я бегал по 10 тыс. Записей, я мог видеть, как он берет каждую запись одну за другой и обновляет статус таким же образом.Первоначально я планировал использовать многопоточность, но это не имеет никакого смысла, так как моя работа выполняется один раз в день с количеством записей от 10 до 100 тыс.(Записи составляют менее 5 тыс. В большинстве дней, и очень мало дней в году (от 5 до 10 дней) до 50–100 тыс.).

Я не хочу добавлять больше процессоров и получать оплату от Kubernetes только на 10 дней в году.Теперь проблема в том, что когда я запустил это задание, для каждого запроса на выборку требуется всего 100 записей, а не по 100 за раз.Кроме того, обновление - это также одна запись за раз, и для обработки 10 тыс. Записей требуется 10 минут, что очень медленно.

Как можно быстрее читать, обрабатывать и записывать? Я могу избавиться от многопоточности и время от времени немного больше загружать процессор.Дополнительная информация представлена ​​в виде комментариев в коде.

@Configuration
@EnableBatchProcessing
public class BatchConfiguration extends DefaultBatchConfigurer{

public final static Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);

@Autowired
JobBuilderFactory jobBuilderFactory;

@Autowired
StepBuilderFactory stepBuilderFactory;

@Autowired
MyRepository myRepository;


@Autowired
private EntityManagerFactory entityManagerFactory;

@Value("${chunk-size}")
private int chunkSize;

@Value("${max-threads}")
private int maxThreads;

private final DataSource dataSource;


/**
 * @param dataSource
 * Override to do not set datasource even if a datasource exist during intialization.
 * Initialize will use a Map based JobRepository (instead of database) for Spring batch meta tables
 */
@Override
public void setDataSource(DataSource dataSource) {
}

@Override
public PlatformTransactionManager getTransactionManager() {
    return jpaTransactionManager();
}


@Autowired
public BatchConfiguration(@Qualifier("dataSource") DataSource dataSource) {
    this.dataSource = dataSource;
}

@Bean
public JpaTransactionManager jpaTransactionManager() {
    final JpaTransactionManager transactionManager = new JpaTransactionManager();
    transactionManager.setDataSource(dataSource);
    return transactionManager;
}


@Bean
@StepScope
public JdbcPagingItemReader<ModelEntity> importReader() {  // I tried using RepositoryItemReader but records were skipped by JPA hence I went for JdbcPagingItemReader
    JdbcPagingItemReader<ModelEntity> reader = new JdbcPagingItemReader<ModelEntity>();
    final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
    sqlPagingQueryProviderFactoryBean.setDataSource( dataSource );
    sqlPagingQueryProviderFactoryBean.setSelectClause( "SELECT *" );
    sqlPagingQueryProviderFactoryBean.setFromClause( "FROM mytable" );
    sqlPagingQueryProviderFactoryBean.setWhereClause( "WHERE STATUS = 'myvalue' " );
    sqlPagingQueryProviderFactoryBean.setSortKey( "primarykey" );
    try {
        reader.setQueryProvider( sqlPagingQueryProviderFactoryBean.getObject() );
    } catch (Exception e) {
        e.printStackTrace();
    }
    reader.setDataSource( dataSource );
    reader.setPageSize( chunkSize );
    reader.setSaveState( Boolean.FALSE );
    reader.setRowMapper( new BeanPropertyRowMapper<ModelEntity>(ModelEntity.class ) );
    return reader;
}



@Bean
public ItemWriter<ModelEntity> databaseWriter() {
    RepositoryItemWriter<ModelEntity> repositoryItemWriter=new RepositoryItemWriter<>();
    repositoryItemWriter.setRepository(myRepository);
    repositoryItemWriter.setMethodName("save");
    return repositoryItemWriter;
}

@Bean
public Myprocessor myprocessor() { 
    return new Myprocessor();
}

@Bean
public JobExecutionListener jobExecutionListener() {
    return new JobExecutionListener();
}

@Bean
public StepExecutionListener stepExecutionListener() {
    return new StepExecutionListener();
}

@Bean
public ChunkExecutionListener chunkListener() {
    return new ChunkExecutionListener();
}

@Bean
public TaskExecutor taskExecutor() {
 SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
 taskExecutor.setConcurrencyLimit(maxThreads);
return taskExecutor;
}

@Bean
public Job processJob() {
    return jobBuilderFactory.get("myjob")
            .incrementer(new RunIdIncrementer())
            .start(processStep())
            .listener(jobExecutionListener())
            .build();
}

@Bean
public Step processStep() {
    return stepBuilderFactory.get("processStep")
            .<ModelEntity,ModelEntity>chunk(chunkSize)
            .reader(importReader())
            .processor(myprocessor())
            .writer(databaseWriter())
            .taskExecutor(taskExecutor())
            .listener(stepExecutionListener())
            .listener(chunkListener())
            .transactionManager(getTransactionManager())
            .throttleLimit(maxThreads)
            .build();
    }

}

Я использую репозиторий JpaRepository и код ниже.(При условии, что метод сохранения его родительского класса CrudRepository сделает сохранение)

public interface MyRepository extends JpaRepository<ModelEntity, BigInteger> {

}

Процессор такой, как показано ниже

@Component
public class Myprocessor implements ItemProcessor<Myprocessor,Myprocessor> {

@Override
public ModelEntity process(ModelEntity modelEntity) throws Exception {
    try {
    // This is fast and working fine
       if ((myProcessing)) {
            modelEntity.setStatus(success);
        } else {
            modelEntity.setStatus(failed);
        }
    }
    catch (Exception e){
        logger.info( "Exception occurred while processing"+e );
      }
    return modelEntity;
 }

 // This is fast and working fine
 public Boolean myProcessing(ModelEntity modelEntity){
 //Processor Logic Here
    return processingStatus;
 }

 }

Файл свойств ниже

logging.level.org.hibernate.SQL=DEBUG
logging.level.com.zaxxer.hikari.HikariConfig=DEBUG
logging.level.org.hibernate.type.descriptor.sql.BasicBinder=TRACE 
logging.level.org.springframework.jdbc.core.JdbcTemplate=DEBUG
logging.level.org.springframework.jdbc.core.StatementCreatorUtils=TRACE


spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.url=url
spring.datasource.username=username
spring.datasource.password=password 
spring.jpa.hibernate.connection.provider_class
=org.hibernate.hikaricp.internal.HikariCPConnectionProvider
spring.jpa.database-platform=org.hibernate.dialect.Oracle10gDialect
spring.jpa.show-sql=false
spring.main.allow-bean-definition-overriding=true
spring.batch.initializer.enabled=false
spring.batch.job.enabled=false
spring.batch.initialize-schema=never 
chunk-size=100
max-threads=5

Ответы [ 2 ]

0 голосов
/ 29 января 2019

Спасибо всем за предложения. Я нашел проблему сам. Я использовал JdbcPagingItemReader и RepositoryItemWriter. Считыватель работал должным образом, но писатель запускал запрос на выборку для каждой записи, передаваемой после процессора. Я полагаю, что причина в том, что запись сохраняется в JPA только после процессора, поскольку считыватель не является стандартным считывателем JPA. Я не уверен в этом, хотя. Но изменение записывающего устройства на JdbcBatchItemWriter устранило проблему.

0 голосов
/ 26 января 2019

Можно включить пакетную обработку JDBC для операторов INSERT, UPDATE и DELETE только с одним свойством конфигурации:

spring.jpa.properties.hibernate.jdbc.batch_size 

Определяет количество обновлений, которые отправляются в базу данных за один раз для выполнения.

Подробнее см. эту ссылку

...