Как пропустить любую ошибку в чанке и перейти к следующим чанкам? - PullRequest
0 голосов
/ 05 февраля 2020

Я создал Reader, Processor и Writer. Я определил размер чанка как 5. У меня есть одна операция в процессоре для каждого элемента. У меня есть две транзакции в Writer. Обновите БД для всех 5 товаров и подтвердите транзакцию для всех 5 товаров в другом месте. Мои предметы не зависят друг от друга, поэтому, если один из них потерпел неудачу, другие предметы не заботятся, они хотят продолжить.

Вариант использования 1:

Если это не удалось в процессоре с любым видом исключения (исключение RESTful, любое исключение java, исключение БД, исключение времени выполнения), скажем, 2-й элемент, я хочу продолжить с 3-го, 4-го и 5-го элементов. Если это не удалось по 4-му пункту, я хочу продолжить с 5-го пункта. Итак, с помощью skip, как я понимаю, когда этот чанк с неисправным элементом в Processor вышел из строя, я могу повторить этот чанк, но без 2-го и 4-го элементов (что не удалось), верно? И если Writer работает хорошо, обе транзакции фиксируются после чанка, а jog запускает следующий чанк со следующими 5 элементами, верно?

Вариант использования 2:

Независимо от того, является ли порция новой или повторяется Вариант использования 1 без этих 2 пунктов, если во Writer произошла ошибка второй транзакции, я хочу откатить первую транзакцию без выполнения отката и фиксации вручную Таким образом, если Write выдает исключение, он автоматически откатит первую транзакцию. И это хорошо. Но я хочу, чтобы даже при исключении и откат транзакции (для этого чанка) я хотел продолжить со следующего чанка таким же образом, с тем же поведением и так далее до последнего чанка.

Вариант использования achive 1 Думаю, мне нужно настроить шаг следующим образом:

@Configuration
@EnableBatchProcessing
@EnableScheduling
@Slf4j
public class BatchConfiguration {

private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final MyItemReader myItemReader;
private final MyItemProcessor myItemProcessor;
private final MyItemWriter myItemWriter;

private final SimpleJobExecutionListener simpleJobExecutionListener;
private final MyChunkListener myChunkListener;

private final ApplicationContext applicationContext;
private final DataSource dataSource;





public BatchConfiguration(
        JobBuilderFactory jobBuilderFactory,
        StepBuilderFactory stepBuilderFactory,
        MyItemReader myItemReader,
        MyItemProcessor myItemProcessor,
        MyItemtWriter myItemWriter,
        SimpleJobExecutionListener simpleJobExecutionListener,
        MyChunkListener myChunkTransactionListener,
        DataSource dataSource,
        ApplicationContext applicationContext) {
    this.jobBuilderFactory = jobBuilderFactory;
    this.stepBuilderFactory = stepBuilderFactory;
    this.myItemReader = myItemReader;
    this.myItemProcessor = myItemProcessor;
    this.myItemWriter = myItemWriter;
    this.simpleJobExecutionListener = simpleJobExecutionListener;
    this.myChunkListener = myChunkListener;
    this.dataSource = dataSource;
    this.applicationContext = applicationContext;
}

@Bean
public Job registrationChunkJob() {
    return jobBuilderFactory.get("MyJob")
            .incrementer(new RunIdIncrementer())
            .listener(simpleJobExecutionListener)
            .flow(step()).end().build();
}

@Bean
TaskExecutor taskExecutorStepPush() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(2);
    taskExecutor.setMaxPoolSize(20);
    taskExecutor.setQueueCapacity(4);
    taskExecutor.setAllowCoreThreadTimeOut(true);
    taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    taskExecutor.setThreadNamePrefix(LoggingUtil.getWeblogicName() + "-");
    return taskExecutor;
}

@Bean
public Step step() {
    DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
    attribute.setPropagationBehavior(Propagation.REQUIRED.value());
    attribute.setIsolationLevel(Isolation.READ_COMMITTED.value());

    return stepBuilderFactory.get("myStep").<MyObject, MyObject>chunk(5)
            .reader(myItemReader)
            .processor(myItemProcessor)
            .faultTolerant()
            .writer(myItemWriter)
            .listener(myChunkListener)
            .taskExecutor(taskExecutorStepPush())
            .throttleLimit(5)
            .transactionAttribute(attribute)
            .build();
}

Моя работа не запланирована. Я начинаю следующую работу вручную, когда текущая работа закончена, успешно или нет. Как я уже сказал, я не изменяю флаг в БД с Writer, поэтому, если он потерпел неудачу, а некоторые данные были пропущены и не обновлены в БД (Writer), когда задание завершено и через 1 час оно запустит новое задание и попытается выполнить то же самое ( и, возможно, новые) элементы из БД (Reader выберет их, потому что флаг не будет обновляться как обработанный).

Но почему-то это не работает, и уже поздно, и я не понимаю, почему. Он занимает 5 элементов в чанке, и он не завершился с ошибкой в ​​процессоре, но произошел сбой в Writer при попытке зафиксировать 2 транзакции (вторая не удалась). Он повторяет чанк, но только с одним элементом, с первым элементом и пробовал его 2 раза (с одним элементом, первым элементом), а затем помечает задание как Сбой и останавливается. Который я не хочу. В БД можно выбрать так много элементов, которые могут быть хорошими.

Я не хочу повторять один и тот же фрагмент, если он не получен от Writer. Я хочу повторять чанк только в случае сбоя в процессоре (только для получения хорошего). Кроме того, если произошел сбой блока, я не хочу, чтобы работа прекращалась, я хочу, чтобы работа продолжалась со следующего блока и так далее ... Как этого добиться?

1 Ответ

0 голосов
/ 06 февраля 2020

Как пропустить любую ошибку в чанке и продолжить со следующими элементами?

Чтобы сделать это, вам нужно настроить, какие исключения должны вызывать пропуск элемента, как объяснено раздел Настройка Skip Logi c.

В соответствии с вашей конфигурацией вы не указали ни одно исключение, которое можно пропустить. Ваше определение шага должно быть примерно таким:

@Bean
public Step step() {
   DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
   attribute.setPropagationBehavior(Propagation.REQUIRED.value());
   attribute.setIsolationLevel(Isolation.READ_COMMITTED.value());

   return stepBuilderFactory.get("myStep").<MyObject, MyObject>chunk(5)
        .reader(myItemReader)
        .processor(myItemProcessor)
        .faultTolerant()
        // add skip configuration
        .skipLimit(10)
        .skip(MySkippableException.class)
        .writer(myItemWriter)
        .listener(myChunkListener)
        .taskExecutor(taskExecutorStepPush())
        .throttleLimit(5)
        .transactionAttribute(attribute)
        .build();
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...