После 50 (размер пула для выполнения jdb c) заданий пула emtpy. Я определил только 1 шаг с чанком и читателем, процессором и писателем.
Я поставил запрос, чтобы ничего не возвращать, и каким-то образом он выполнил 2 чанка. У меня также была проблема с многопоточным ридером, но я установил throttleLimit в 1 . Другое дело, я запускаю выполнение следующего задания, когда выполняемое задание завершено, а не раньше (что может случиться с запланированным). Итак, я запускаю задание в методе @ PostConstruct и вызываю следующее выполнение после завершения задания (независимо от способа) в SimpleJobExecutionListener .
JobMain
@Slf4j
@Component
public class JobMain {
private final Job job;
private final JobLauncher jobLauncher;
public JobMain(Job job,
JobLauncher jobLauncher) {
this.job = job;
this.jobLauncher = jobLauncher;
}
@Retryable
@PostConstruct
public void launchJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobNameRegister", String.valueOf(instant.toEpochMilli()))
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
}
Пакетная конфигурация:
@Configuration
@EnableBatchProcessing
@EnableScheduling
@Slf4j
public class BatchConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DocumentReader documentReader;
private final RegisterProcessor registerProcessor;
private final DocumentWriter documentWriter;
private final SimpleJobExecutionListener simpleJobExecutionListener;
public BatchConfiguration(
JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory,
DocumentReader documentReader,
RegisterProcessor registerProcessor,
DocumentWriter documentWriter,
SimpleJobExecutionListener simpleJobExecutionListener,
ChunkListener chunkListener) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.documentReader = documentReader;
this.registerProcessor = registerProcessor;
this.documentWriter = documentWriter;
this.simpleJobExecutionListener = simpleJobExecutionListener;
this.chunkListener = chunkListener;
}
@Bean
public Job registrationChunkJob() {
return jobBuilderFactory.get("jobNameRegister")
.incrementer(new RunIdIncrementer())
.listener(simpleJobExecutionListener)
.flow(step()).end().build();
}
@Bean
TaskExecutor taskExecutorStepPush() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(20);
taskExecutor.setQueueCapacity(4);
taskExecutor.setAllowCoreThreadTimeOut(true);
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
return taskExecutor;
}
@Bean
public Step step() {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
attribute.setIsolationLevel(Isolation.READ_COMMITTED.value());
return stepBuilderFactory.get("stepNameRegister").<Object, Object>chunk(10)
.reader(documentReader)
.processor(registerProcessor)
.writer(documentWriter)
.listener(chunkListener)
.taskExecutor(taskExecutorStepPush())
.throttleLimit(1)
.transactionAttribute(attribute)
.build();
}
}
SimpleJobExecutionListener
@Component
@Slf4j
public class SimpleJobExecutionListener implements JobExecutionListener {
private final JobMain jobMain;
public SimpleJobExecutionListener(@Lazy JobMain jobMain) {
this.jobMain = jobMain;
log.debug("SimpleJobExecutionListener initialize");
}
@Override
public void afterJob(JobExecution jobExecution) {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
throw new RuntimeException();
}
try {
jobMain.launchJob();
} catch (JobParametersInvalidException | JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException e) {
throw new RuntimeException();
}
}
@Override
public void beforeJob(JobExecution jobExecution) {
log.debug("SimpleJobExecutionListener before Job");
}
}
ChunkListener
@Slf4j
@ Component publi c класс CocoaChunkTransactionIntegration {
public CocoaChunkTransactionIntegration() {
log.debug("CocoaChunkTransactionIntegration initialized");
}
@BeforeChunk
public void onChunkStart(ChunkContext context) throws IOException {
log.debug(t+" - onChunkStart,"+context.getStepContext().getJobParameters().get("jobNameRegister"));
}
@AfterChunk
public void onChunkEnd(ChunkContext context) {
log.debug("onChunkEnd,"+context.getStepContext().getJobParameters().get("jobNameRegister"));
}
@AfterChunkError
public void onChunkError(ChunkContext context) {
Exception e = (Exception) context.getAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY);
log.error("onChunkError,"+context.getStepContext().getJobParameters().get("jobNameRegister"), e);
}
}
DocumentReader
@Slf4j
@Component
public class DocumentReader implements ItemReader<VwCocoa>, StepExecutionListener {
private final DataSource dataSource;
private JdbcCursorItemReader<Object> customerJdbcCursorItemReader;
private ExecutionContext executionContext;
public DocumentReader(DataSource dataSource) {
this.dataSource = dataSource;
log.debug("Reader initialize");
}
@Override
public void beforeStep(StepExecution stepExecution) {
customerJdbcCursorItemReader = new JdbcCursorItemReader<>();
customerJdbcCursorItemReader.setDataSource(dataSource);
customerJdbcCursorItemReader.setSql("SELECT * FROM TABLE WHERE 1 = 0");
customerJdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Object.class));
executionContext = new ExecutionContext();
customerJdbcCursorItemReader.open(executionContext);
log.debug("Reader before step");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
@Override
public synchronized Object read() throws Exception {
Object object = customerJdbcCursorItemReader.read();
if(object!=null) log.debug("Reader read "+object.getBid());
else log.debug("Items readed!!!");
return object;
}
}
RegisterProcessor
@Slf4j
@Component
public class RegisterProcessor implements ItemProcessor<Object, Object> {
@Override
public Object process(Object object) throws Exception {
return someJob(object);
}
private Object someJob(Object object) {
return object;
}
}
DocumentWritter
@Slf4j
@Component
public class DocumentWriter implements ItemWriter<Object> {
private final NamedParameterJdbcTemplate namedParameterJdbcTemplate;
public DocumentWriter(NamedParameterJdbcTemplate namedParameterJdbcTemplate) {
this.namedParameterJdbcTemplate = namedParameterJdbcTemplate;
log.debug("Writer initialize");
}
@Override
public void write(List<? extends Object> list) throws IOException {
SqlParameterSource[] batch = SqlParameterSourceUtils.createBatch(list.toArray());
int[] updateCounts = namedParameterJdbcTemplate.batchUpdate("update OBJECT set PROP1 = :prop1, PROP2 = :prop2 where PROP3 = :prop3", batch);
log.info("End of write");
}
}
А вот мой журнал, где очевидно, что этот чанк выполнен в 2 темы:
[ (self-tuning)'] MySpringBootApp : Starting MySpringBootApp on D02DI1859551CMP with PID 26124 (C:\...\Oracle_Home\user_projects\domains\base_domain\servers\AdminServer\tmp\_WL_user\app-1.0.0-SNAPSHOT\jpepki\war\WEB-INF\lib\_wl_cls_gen.jar started by me in C:\...\Oracle_Home\user_projects\domains\base_domain)
[ (self-tuning)'] **MySpringBootApp** : Running with Spring Boot v2.1.9.RELEASE, Spring v5.1.4.RELEASE
[ (self-tuning)'] **MySpringBootApp** : The following profiles are active: dev
[ (self-tuning)'] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration' of type [org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration$$EnhancerBySpringCGLIB$$bed3107e] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[ (self-tuning)'] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 1727 ms
[ (self-tuning)'] **DocumentReader** : Reader initialize
[ (self-tuning)'] **ChunkListener** : ChunkListener initialized
[ (self-tuning)'] **RegisterProcessor** : Processor initialize
[ (self-tuning)'] **DocumentWriter** : Writer initialize
[ (self-tuning)'] **SimpleJobExecutionListener** : SimpleJobExecutionListener initialize
[ (self-tuning)'] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'taskExecutorStepPush'
[ (self-tuning)'] **JobMain** : 1583847838770,51, starting a job!
[ (self-tuning)'] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: ORACLE
[ (self-tuning)'] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
[ (self-tuning)'] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=jobEdmaRegister]] launched with the following parameters: [{jobNameRegister=1583847838770}]
[ (self-tuning)'] SimpleJobExecutionListener : SimpleJobExecutionListener before Job
[ (self-tuning)'] o.s.batch.core.job.SimpleStepHandler : Executing step: [stepNameRegister]
[ (self-tuning)'] **DocumentReader** : Reader before step
[cutorStepPush-1] **ChunkListener** : 1583847839661 - onChunkStart,1583847838770
[cutorStepPush-1] **DocumentReader** : Items readed!!!
[cutorStepPush-1] **ChunkListener** : onChunkEnd,1583847838770
[cutorStepPush-2] **ChunkListener** : 1583847839699 - onChunkStart,1583847838770
[cutorStepPush-2] **DocumentReader** : Items readed!!!
[cutorStepPush-2] **ChunkListener** : onChunkEnd,1583847838770
[ (self-tuning)'] SimpleJobExecutionListener : 1583847839782,[ACTIVE] ExecuteThread: '6' for queue: 'weblogic.kernel.Default (self-tuning)'(51), SimpleJobExecutionListener afterJob[COMPLETED],1583847838770