При выполнении задания Spring-Batch Partitioned задание завершается ненормально (в состоянии сбоя), как только последний поток завершает обработку.
Казалось бы, как будто все данные обрабатываются ... однако состояние задания НЕ УКАЗАНО по весне.
По сути, я пытаюсь обработать 100 ГБ данных (1000 файлов ZIP, каждый из которых содержитоколо 50000 текстовых файлов), поэтому ~ 50MIL записей).Размер моего чанка равен 1 КБ, а размер основного пула - 50, максимальное - 100.
Разделитель является MultiResourcePartitioner, а
Я пытался изменить размер чанка, размер пакета и т. Д.
Работа выглядит хорошо при меньшем количестве почтовых индексов, но при большем количестве (возможно, просто из-за большего количества потоков) в конце взрывается ...
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(50);
taskExecutor.setMaxPoolSize(100);
taskExecutor.setThreadNamePrefix("threadPoolExecutor-");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
@Bean
public <T> Step step() throws Exception {
LOG.info("Chunksize is {}", chunkSize);
return stepBuilderFactory.get("step")
.<Message, T>chunk(chunkSize)
.faultTolerant()
.noRollback(OurServiceException.class)
.reader(fileReader(null))
.processor(processor())
.faultTolerant()
.skipPolicy(new ExceptionSkipper())
.writer(writer())
.build();
}
@Bean
public Step partitionStep(Step step) throws Exception {
return stepBuilderFactory.get("partitionStep")
.partitioner(step)
.partitioner("step", partitioner())
.taskExecutor(taskExecutor())
.build();
}
Wrapped by: org.springframework.retry.RetryException: Non-skippable exception in recoverer while processing; nested exception is java.lang.NullPointerException
at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$2.recover(FaultTolerantChunkProcessor.java:289)
at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:512)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:351)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.batch.core.step.item.BatchRetryTemplate.execute(BatchRetryTemplate.java:217)
at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.transform(FaultTolerantChunkProcessor.java:298)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:202)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:272)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:257)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200)
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:139)
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:136)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2019-07-06 12:58:31.665 ERROR [o.s.b.c.s.AbstractStep] [main] - Encountered an error executing step partitionStep in job myJob
org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)