Исключение во время выполнения Spring Batch в процессоре Item - PullRequest
0 голосов
/ 28 октября 2019

Я изучаю Spring Batch и пытаюсь понять, как работает элементный процессор, во время исключения.

Я читаю данные из CSV-файла в чанке из 3 записей, обрабатываю их и записываю в базу данных.

мой CSV-файл

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doem
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe

Пакетная конфигурация, чтение элементов в блоках 3 и ограничение пропуска 2

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public FlatFileItemReader<Person> reader() {
        return new FlatFileItemReaderBuilder<Person>().name("personItemReader").resource(new ClassPathResource("sample-data.csv")).delimited()
                .names(new String[] { "firstName", "lastName" }).fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
                    {
                        setTargetType(Person.class);
                    }
                }).build();
    }

    @Bean
    public PersonItemProcessor processor() {
        return new PersonItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Person>().itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO person (first_name, last_name) VALUES (:firstName, :lastName)").dataSource(dataSource).build();
    }

    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
        return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).listener(listener).flow(step1).end().build();
    }

   @Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
    return stepBuilderFactory.get("step1").<Person, Person> chunk(3).reader(reader()).processor(processor()).writer(writer).faultTolerant().skipLimit(2)
            .skip(Exception.class).build();
}
   }

Я пытаюсь смоделировать исключение, выдав исключениевручную для одной записи в моем процессоре элементов

public class PersonItemProcessor implements ItemProcessor<Person, Person> {

    private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

    @Override
    public Person process(final Person person) throws Exception {
        final String firstName = person.getFirstName().toUpperCase();
        final String lastName = person.getLastName().toUpperCase();

        final Person transformedPerson = new Person(firstName, lastName);

        log.info("Converting (" + person + ") into (" + transformedPerson + ")");
        if (person.getLastName().equals("Doem"))
            throw new Exception("DOOM");
        return transformedPerson;
    }
}

Теперь в соответствии с лимитом пропуска, когда генерируется исключение, процессор элементов повторно обрабатывает блок и пропускает элемент, который выдает ошибку, и запись элемента также вставляет всезаписей в БД, кроме одной записи за исключением.

Это все нормально, потому что мой процессор просто конвертирует имя в нижний регистр и может многократно запускаться без какого-либо влияния.

Но давайте предположим, что мой процессор элементов вызывает веб-сервис и отправляет данные. и если какое-то исключение выдается после успешного вызова веб-службы. тогда оставшиеся данные в чанке будут снова обработаны (и снова вызваны веб-сервисом). Я не хочу снова вызывать веб-сервис, потому что это похоже на отправку дублированных данных в веб-сервис, и система веб-сервиса не может идентифицировать дубликаты.

Как обращаться с таким случаем. один из вариантов - не пропускать исключение, что означает, что моя еще одна запись в чанке не попадет в средство записи элементов, даже если процессор вызвал веб-службу. так что это не правильно.

чанк другой опции должен иметь размер 1, тогда это может быть неэффективно при обработке тысяч записей.

какие есть другие опции?

1 Ответ

0 голосов
/ 29 октября 2019

Согласно вашему описанию, ваш предметный процессор не идемпотентен. Однако в разделе Отказоустойчивость документации говорится, что процессор элемента должен быть идемпотентным при использовании отказоустойчивого шага. Вот отрывок:

Если шаг настроен на отказоустойчивость (обычно с использованием пропуска или повторной обработки), любой используемый ItemProcessor должен быть реализован идемпотентно.

...