Я создал Reader, Processor и Writer. Я определил размер чанка как 5. У меня есть одна операция в процессоре для каждого элемента. У меня есть две транзакции в Writer. Обновите БД для всех 5 товаров и подтвердите транзакцию для всех 5 товаров в другом месте. Мои предметы не зависят друг от друга, поэтому, если один из них потерпел неудачу, другие предметы не заботятся, они хотят продолжить.
Вариант использования 1:
Если это не удалось в процессоре с любым видом исключения (исключение RESTful, любое исключение java, исключение БД, исключение времени выполнения), скажем, 2-й элемент, я хочу продолжить с 3-го, 4-го и 5-го элементов. Если это не удалось по 4-му пункту, я хочу продолжить с 5-го пункта. Итак, с помощью skip, как я понимаю, когда этот чанк с неисправным элементом в Processor вышел из строя, я могу повторить этот чанк, но без 2-го и 4-го элементов (что не удалось), верно? И если Writer работает хорошо, обе транзакции фиксируются после чанка, а jog запускает следующий чанк со следующими 5 элементами, верно?
Вариант использования 2:
Независимо от того, является ли порция новой или повторяется Вариант использования 1 без этих 2 пунктов, если во Writer произошла ошибка второй транзакции, я хочу откатить первую транзакцию без выполнения отката и фиксации вручную Таким образом, если Write выдает исключение, он автоматически откатит первую транзакцию. И это хорошо. Но я хочу, чтобы даже при исключении и откат транзакции (для этого чанка) я хотел продолжить со следующего чанка таким же образом, с тем же поведением и так далее до последнего чанка.
Вариант использования achive 1 Думаю, мне нужно настроить шаг следующим образом:
@Configuration
@EnableBatchProcessing
@EnableScheduling
@Slf4j
public class BatchConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final MyItemReader myItemReader;
private final MyItemProcessor myItemProcessor;
private final MyItemWriter myItemWriter;
private final SimpleJobExecutionListener simpleJobExecutionListener;
private final MyChunkListener myChunkListener;
private final ApplicationContext applicationContext;
private final DataSource dataSource;
public BatchConfiguration(
JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory,
MyItemReader myItemReader,
MyItemProcessor myItemProcessor,
MyItemtWriter myItemWriter,
SimpleJobExecutionListener simpleJobExecutionListener,
MyChunkListener myChunkTransactionListener,
DataSource dataSource,
ApplicationContext applicationContext) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.myItemReader = myItemReader;
this.myItemProcessor = myItemProcessor;
this.myItemWriter = myItemWriter;
this.simpleJobExecutionListener = simpleJobExecutionListener;
this.myChunkListener = myChunkListener;
this.dataSource = dataSource;
this.applicationContext = applicationContext;
}
@Bean
public Job registrationChunkJob() {
return jobBuilderFactory.get("MyJob")
.incrementer(new RunIdIncrementer())
.listener(simpleJobExecutionListener)
.flow(step()).end().build();
}
@Bean
TaskExecutor taskExecutorStepPush() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(2);
taskExecutor.setMaxPoolSize(20);
taskExecutor.setQueueCapacity(4);
taskExecutor.setAllowCoreThreadTimeOut(true);
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setThreadNamePrefix(LoggingUtil.getWeblogicName() + "-");
return taskExecutor;
}
@Bean
public Step step() {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
attribute.setIsolationLevel(Isolation.READ_COMMITTED.value());
return stepBuilderFactory.get("myStep").<MyObject, MyObject>chunk(5)
.reader(myItemReader)
.processor(myItemProcessor)
.faultTolerant()
.writer(myItemWriter)
.listener(myChunkListener)
.taskExecutor(taskExecutorStepPush())
.throttleLimit(5)
.transactionAttribute(attribute)
.build();
}
Моя работа не запланирована. Я начинаю следующую работу вручную, когда текущая работа закончена, успешно или нет. Как я уже сказал, я не изменяю флаг в БД с Writer, поэтому, если он потерпел неудачу, а некоторые данные были пропущены и не обновлены в БД (Writer), когда задание завершено и через 1 час оно запустит новое задание и попытается выполнить то же самое ( и, возможно, новые) элементы из БД (Reader выберет их, потому что флаг не будет обновляться как обработанный).
Но почему-то это не работает, и уже поздно, и я не понимаю, почему. Он занимает 5 элементов в чанке, и он не завершился с ошибкой в процессоре, но произошел сбой в Writer при попытке зафиксировать 2 транзакции (вторая не удалась). Он повторяет чанк, но только с одним элементом, с первым элементом и пробовал его 2 раза (с одним элементом, первым элементом), а затем помечает задание как Сбой и останавливается. Который я не хочу. В БД можно выбрать так много элементов, которые могут быть хорошими.
Я не хочу повторять один и тот же фрагмент, если он не получен от Writer. Я хочу повторять чанк только в случае сбоя в процессоре (только для получения хорошего). Кроме того, если произошел сбой блока, я не хочу, чтобы работа прекращалась, я хочу, чтобы работа продолжалась со следующего блока и так далее ... Как этого добиться?