Работа с одним шагом, выполняющая 2 чанка, даже элементы из базы данных равны нулю, и один из этих 2 чанков не закрывает соединение - PullRequest
0 голосов
/ 10 марта 2020

После 50 (размер пула для выполнения jdb c) заданий пула emtpy. Я определил только 1 шаг с чанком и читателем, процессором и писателем.

Я поставил запрос, чтобы ничего не возвращать, и каким-то образом он выполнил 2 чанка. У меня также была проблема с многопоточным ридером, но я установил throttleLimit в 1 . Другое дело, я запускаю выполнение следующего задания, когда выполняемое задание завершено, а не раньше (что может случиться с запланированным). Итак, я запускаю задание в методе @ PostConstruct и вызываю следующее выполнение после завершения задания (независимо от способа) в SimpleJobExecutionListener .

JobMain

@Slf4j
@Component
public class JobMain {

private final Job job;
private final JobLauncher jobLauncher;

public JobMain(Job job,
               JobLauncher jobLauncher) {
    this.job = job;
    this.jobLauncher = jobLauncher;
}


@Retryable
@PostConstruct
public void launchJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
    JobParameters jobParameters = new JobParametersBuilder()
            .addString("jobNameRegister", String.valueOf(instant.toEpochMilli()))
            .toJobParameters();
    jobLauncher.run(job, jobParameters);
}
}

Пакетная конфигурация:

@Configuration
@EnableBatchProcessing
@EnableScheduling
@Slf4j
public class BatchConfiguration {

private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;

private final DocumentReader documentReader;
private final RegisterProcessor registerProcessor;
private final DocumentWriter documentWriter;

private final SimpleJobExecutionListener simpleJobExecutionListener;






public BatchConfiguration(
        JobBuilderFactory jobBuilderFactory,
        StepBuilderFactory stepBuilderFactory,
        DocumentReader documentReader,
        RegisterProcessor registerProcessor,
        DocumentWriter documentWriter,
        SimpleJobExecutionListener simpleJobExecutionListener,
        ChunkListener chunkListener) {
    this.jobBuilderFactory = jobBuilderFactory;
    this.stepBuilderFactory = stepBuilderFactory;
    this.documentReader = documentReader;
    this.registerProcessor = registerProcessor;
    this.documentWriter = documentWriter;
    this.simpleJobExecutionListener = simpleJobExecutionListener;
    this.chunkListener = chunkListener;
}


@Bean
public Job registrationChunkJob() {
    return jobBuilderFactory.get("jobNameRegister")
            .incrementer(new RunIdIncrementer())
            .listener(simpleJobExecutionListener)
            .flow(step()).end().build();
}



@Bean
TaskExecutor taskExecutorStepPush() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(5);
    taskExecutor.setMaxPoolSize(20);
    taskExecutor.setQueueCapacity(4);
    taskExecutor.setAllowCoreThreadTimeOut(true);
    taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    return taskExecutor;
}


@Bean
public Step step() {
    DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
    attribute.setPropagationBehavior(Propagation.REQUIRED.value());
    attribute.setIsolationLevel(Isolation.READ_COMMITTED.value());

    return stepBuilderFactory.get("stepNameRegister").<Object, Object>chunk(10)
            .reader(documentReader)
            .processor(registerProcessor)
            .writer(documentWriter)
            .listener(chunkListener)
            .taskExecutor(taskExecutorStepPush())
            .throttleLimit(1)
            .transactionAttribute(attribute)
            .build();
}

}

SimpleJobExecutionListener

@Component
@Slf4j
public class SimpleJobExecutionListener implements JobExecutionListener {

private final JobMain jobMain;

public SimpleJobExecutionListener(@Lazy JobMain jobMain) {
    this.jobMain = jobMain;
    log.debug("SimpleJobExecutionListener initialize");
}

@Override
public void afterJob(JobExecution jobExecution) {
    try {
        Thread.sleep(30000);
    } catch (InterruptedException e) {
        throw new RuntimeException();
    }
    try {
        jobMain.launchJob();
        } catch (JobParametersInvalidException | JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException e) {
            throw new RuntimeException();
        }
}

@Override
public void beforeJob(JobExecution jobExecution) {
    log.debug("SimpleJobExecutionListener before Job");
}

} ​​

ChunkListener

@Slf4j

@ Component publi c класс CocoaChunkTransactionIntegration {

public CocoaChunkTransactionIntegration() {
    log.debug("CocoaChunkTransactionIntegration initialized");
}

@BeforeChunk
public void onChunkStart(ChunkContext context) throws IOException {
    log.debug(t+" - onChunkStart,"+context.getStepContext().getJobParameters().get("jobNameRegister"));
}

@AfterChunk
public void onChunkEnd(ChunkContext context) {
    log.debug("onChunkEnd,"+context.getStepContext().getJobParameters().get("jobNameRegister"));
}

@AfterChunkError
public void onChunkError(ChunkContext context) {
    Exception e = (Exception) context.getAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY);
    log.error("onChunkError,"+context.getStepContext().getJobParameters().get("jobNameRegister"), e);
}
}

DocumentReader

@Slf4j
@Component
public class DocumentReader implements ItemReader<VwCocoa>, StepExecutionListener {

private final DataSource dataSource;
private JdbcCursorItemReader<Object> customerJdbcCursorItemReader;
private ExecutionContext executionContext;

public DocumentReader(DataSource dataSource) {
    this.dataSource = dataSource;
    log.debug("Reader initialize");
}

@Override
public void beforeStep(StepExecution stepExecution) {
    customerJdbcCursorItemReader = new JdbcCursorItemReader<>();
    customerJdbcCursorItemReader.setDataSource(dataSource);
    customerJdbcCursorItemReader.setSql("SELECT * FROM TABLE WHERE 1 = 0");
    customerJdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Object.class));
    executionContext = new ExecutionContext();
    customerJdbcCursorItemReader.open(executionContext);
    log.debug("Reader before step");
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
    return null;
}

@Override
public synchronized Object read() throws Exception {
    Object object = customerJdbcCursorItemReader.read();
    if(object!=null) log.debug("Reader read "+object.getBid());
    else log.debug("Items readed!!!");
    return object;
}
}

RegisterProcessor

@Slf4j
@Component
public class RegisterProcessor implements ItemProcessor<Object, Object> {


@Override
public Object process(Object object) throws Exception {
    return someJob(object);
}

private Object someJob(Object object) {
return object;
}
}

DocumentWritter

@Slf4j
@Component
public class DocumentWriter implements ItemWriter<Object> {
private final NamedParameterJdbcTemplate namedParameterJdbcTemplate;

public DocumentWriter(NamedParameterJdbcTemplate namedParameterJdbcTemplate) {
    this.namedParameterJdbcTemplate = namedParameterJdbcTemplate;
    log.debug("Writer initialize");
}

@Override
public void write(List<? extends Object> list) throws IOException {
    SqlParameterSource[] batch = SqlParameterSourceUtils.createBatch(list.toArray());
    int[] updateCounts = namedParameterJdbcTemplate.batchUpdate("update OBJECT set PROP1 = :prop1, PROP2 = :prop2 where PROP3 = :prop3", batch);
    log.info("End of write");
}

}

А вот мой журнал, где очевидно, что этот чанк выполнен в 2 темы:

[ (self-tuning)'] MySpringBootApp    : Starting MySpringBootApp on D02DI1859551CMP with PID 26124 (C:\...\Oracle_Home\user_projects\domains\base_domain\servers\AdminServer\tmp\_WL_user\app-1.0.0-SNAPSHOT\jpepki\war\WEB-INF\lib\_wl_cls_gen.jar started by me in C:\...\Oracle_Home\user_projects\domains\base_domain)
[ (self-tuning)'] **MySpringBootApp**    : Running with Spring Boot v2.1.9.RELEASE, Spring v5.1.4.RELEASE
[ (self-tuning)'] **MySpringBootApp**    : The following profiles are active: dev
[ (self-tuning)'] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration' of type         [org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration$$EnhancerBySpringCGLIB$$bed3107e] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[ (self-tuning)'] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 1727 ms
[ (self-tuning)'] **DocumentReader**    : Reader initialize
[ (self-tuning)'] **ChunkListener** : ChunkListener initialized
[ (self-tuning)'] **RegisterProcessor**    : Processor initialize
[ (self-tuning)'] **DocumentWriter**    : Writer initialize
[ (self-tuning)'] **SimpleJobExecutionListener** : SimpleJobExecutionListener initialize
[ (self-tuning)'] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'taskExecutorStepPush'
[ (self-tuning)'] **JobMain**    : 1583847838770,51, starting a job!
[ (self-tuning)'] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: ORACLE
[ (self-tuning)'] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
[ (self-tuning)'] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=jobEdmaRegister]] launched with the following parameters: [{jobNameRegister=1583847838770}]
[ (self-tuning)'] SimpleJobExecutionListener : SimpleJobExecutionListener before Job
[ (self-tuning)'] o.s.batch.core.job.SimpleStepHandler     : Executing step: [stepNameRegister]
[ (self-tuning)'] **DocumentReader**    : Reader before step
[cutorStepPush-1] **ChunkListener** : 1583847839661 - onChunkStart,1583847838770
[cutorStepPush-1] **DocumentReader**    : Items readed!!!
[cutorStepPush-1] **ChunkListener** : onChunkEnd,1583847838770
[cutorStepPush-2] **ChunkListener** : 1583847839699 - onChunkStart,1583847838770
[cutorStepPush-2] **DocumentReader**    : Items readed!!!
[cutorStepPush-2] **ChunkListener** : onChunkEnd,1583847838770
[ (self-tuning)'] SimpleJobExecutionListener : 1583847839782,[ACTIVE] ExecuteThread: '6' for queue: 'weblogic.kernel.Default (self-tuning)'(51), SimpleJobExecutionListener afterJob[COMPLETED],1583847838770
...