Почему исключение в Spring Batch AsycItemProcessor обнаружено методом onSkipInWrite SkipListener? - PullRequest
1 голос
/ 29 мая 2020

Я пишу приложение Spring Boot, которое запускается, собирает и преобразует миллионы записей базы данных в новый оптимизированный формат JSON, а затем отправляет их все в GCP PubSub topi c. Я пытаюсь использовать для этого Spring Batch, но у меня возникают проблемы с реализацией отказоустойчивости моего процесса. База данных изобилует проблемами качества данных, и иногда мои преобразования в JSON терпят неудачу. При возникновении сбоев я не хочу, чтобы задание немедленно завершалось, я хочу, чтобы оно продолжало обрабатывать столько записей, сколько может, и до завершения сообщать, какие именно записи завершились ошибкой, чтобы я или моя команда могли их изучить. проблемати c записей в базе.

Для этого я попытался использовать интерфейс SkipListener Spring Batch. Но я также использую AsyncItemProcessor и AsyncItemWriter в своем процессе, и хотя исключения возникают во время обработки, метод SkipListener onSkipInWrite() перехватывает их, а не метод onSkipInProcess(). И, к сожалению, метод onSkipInWrite() не имеет доступа к исходному объекту базы данных, поэтому я не могу сохранить его идентификатор в моем списке проблемных c записей БД.

Я что-то неправильно сконфигурировал? Есть ли другой способ получить доступ к объектам от читателя, который не прошел этап обработки AsynItemProcessor?

Вот что я пробовал ...

У меня есть одноэлементный компонент Spring где я храню, сколько записей БД я успешно обработал, а также до 20 проблемных c записей базы данных.

@Component
@Getter //lombok
public class ProcessStatus {
    private int processed;
    private int failureCount;
    private final List<UnexpectedFailure> unexpectedFailures = new ArrayList<>();

    public void incrementProgress { processed++; }
    public void logUnexpectedFailure(UnexpectedFailure failure) {
        failureCount++;
        unexpectedFailure.add(failure);
    }

    @Getter
    @AllArgsConstructor
    public static class UnexpectedFailure {
        private Throwable error;
        private DBProjection dbData;
    }
}

У меня есть пакетный слушатель пропуска Spring, который должен обнаруживать сбои и обновлять мой компонент состояния соответственно:

@AllArgsConstructor
public class ConversionSkipListener implements SkipListener<DBProjection, Future<JsonMessage>> {
    private ProcessStatus processStatus;

    @Override
    public void onSkipInRead(Throwable error) {}

    @Override
    public void onSkipInProcess(DBProjection dbData, Throwable error) {
        processStatus.logUnexpectedFailure(new ProcessStatus.UnexpectedFailure(error, dbData));
    }

    @Override
    public void onSkipInWrite(Future<JsonMessage> messageFuture, Throwable error) {
        //This is getting called instead!! Even though the exception happened during processing :(
        //But I have no access to the original DBProjection data here, and messageFuture.get() gives me null.
    }
}

А потом я настроил свою работу так:

@Configuration
public class ConversionBatchJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private TaskExecutor processThreadPool;

    @Bean
    public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:100}") Integer chunkSize) {
        return new SimpleCompletionPolicy(chunkSize);
    }

    @Bean
    @StepScope
    public ItemStreamReader<DbProjection> dbReader(
            MyDomainRepository myDomainRepository,
            @Value("#{jobParameters[pageSize]}") Integer pageSize,
            @Value("#{jobParameters[limit]}") Integer limit) {
        RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
        myDomainRepositoryReader.setRepository(myDomainRepository);
        myDomainRepositoryReader.setMethodName("findActiveDbDomains"); //A native query
        myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
            add("ACTIVE");
        }});
        myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
            put("update_date", Sort.Direction.ASC);
        }});
        myDomainRepositoryReader.setPageSize(pageSize);
        myDomainRepositoryReader.setMaxItemCount(limit);
        // myDomainRepositoryReader.setSaveState(false); <== haven't figured out what this does yet
        return myDomainRepositoryReader;
    }

    @Bean
    @StepScope
    public ItemProcessor<DbProjection, JsonMessage> dataConverter(DataRetrievalSerivice dataRetrievalService) {
        //Sometimes throws exceptions when DB data is exceptionally weird, bad, or missing
        return new DbProjectionToJsonMessageConverter(dataRetrievalService);
    }

    @Bean
    @StepScope
    public AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter(
            ItemProcessor<DbProjection, JsonMessage> dataConverter) throws Exception {
        AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter = new AsyncItemProcessor<>();
        asyncDataConverter.setDelegate(dataConverter);
        asyncDataConverter.setTaskExecutor(processThreadPool);
        asyncDataConverter.afterPropertiesSet();
        return asyncDataConverter;
    }

    @Bean
    @StepScope
    public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
        return new JsonMessageWriter(publisherService);
    }

    @Bean
    @StepScope
    public AsyncItemWriter<JsonMessage> asyncJsonPublisher(ItemWriter<JsonMessage> jsonPublisher) throws Exception {
        AsyncItemWriter<JsonMessage> asyncJsonPublisher = new AsyncItemWriter<>();
        asyncJsonPublisher.setDelegate(jsonPublisher);
        asyncJsonPublisher.afterPropertiesSet();
        return asyncJsonPublisher;
    }

    @Bean
    public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
                                  ItemStreamReader<DbProjection> dbReader,
                                  AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter,
                                  AsyncItemWriter<JsonMessage> asyncJsonPublisher,
                                  ProcessStatus processStatus,
                                  @Value("${conversion.failure.limit:20}") int maximumFailures) {
        return stepBuilderFactory.get("conversionProcess")
                .<DbProjection, Future<JsonMessage>>chunk(processChunkSize)
                .reader(dbReader)
                .processor(asyncDataConverter)
                .writer(asyncJsonPublisher)
                .faultTolerant()
                .skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
                            //  ^ for now this returns true for everything until 20 failures
                    .listener(new ConversionSkipListener(processStatus))
                .build();
    }

    @Bean
    public Job conversionJob(Step conversionProcess) {
        return jobBuilderFactory.get("conversionJob")
                .start(conversionProcess)
                .build();
    }
}

1 Ответ

1 голос
/ 29 мая 2020

Это связано с тем, что будущее, заключенное в AsyncItemProcessor, разворачивается только в AsyncItemWriter, поэтому любое исключение, которое может произойти в это время, рассматривается как исключение записи, а не исключение обработки. Вот почему onSkipInWrite вызывается вместо onSkipInProcess.

На самом деле это известное ограничение этого шаблона, которое задокументировано в Javado c из AsyncItemProcessor , вот выдержка:

Because the Future is typically unwrapped in the ItemWriter,
there are lifecycle and stats limitations (since the framework doesn't know 
what the result of the processor is).

While not an exhaustive list, things like StepExecution.filterCount will not
reflect the number of filtered items and 
itemProcessListener.onProcessError(Object, Exception) will not be called.

В Javado c указано, что список не является исчерпывающим, и побочный эффект в отношении SkipListener, с которым вы сталкиваетесь, является одним из этих ограничений.

...