Я пишу приложение Spring Boot, которое запускается, собирает и преобразует миллионы записей базы данных в новый оптимизированный формат JSON, а затем отправляет их все в GCP PubSub topi c. Я пытаюсь использовать для этого Spring Batch, но у меня возникают проблемы с реализацией отказоустойчивости моего процесса. База данных изобилует проблемами качества данных, и иногда мои преобразования в JSON терпят неудачу. При возникновении сбоев я не хочу, чтобы задание немедленно завершалось, я хочу, чтобы оно продолжало обрабатывать столько записей, сколько может, и до завершения сообщать, какие именно записи завершились ошибкой, чтобы я или моя команда могли их изучить. проблемати c записей в базе.
Для этого я попытался использовать интерфейс SkipListener Spring Batch. Но я также использую AsyncItemProcessor и AsyncItemWriter в своем процессе, и хотя исключения возникают во время обработки, метод SkipListener onSkipInWrite()
перехватывает их, а не метод onSkipInProcess()
. И, к сожалению, метод onSkipInWrite()
не имеет доступа к исходному объекту базы данных, поэтому я не могу сохранить его идентификатор в моем списке проблемных c записей БД.
Я что-то неправильно сконфигурировал? Есть ли другой способ получить доступ к объектам от читателя, который не прошел этап обработки AsynItemProcessor?
Вот что я пробовал ...
У меня есть одноэлементный компонент Spring где я храню, сколько записей БД я успешно обработал, а также до 20 проблемных c записей базы данных.
@Component
@Getter //lombok
public class ProcessStatus {
private int processed;
private int failureCount;
private final List<UnexpectedFailure> unexpectedFailures = new ArrayList<>();
public void incrementProgress { processed++; }
public void logUnexpectedFailure(UnexpectedFailure failure) {
failureCount++;
unexpectedFailure.add(failure);
}
@Getter
@AllArgsConstructor
public static class UnexpectedFailure {
private Throwable error;
private DBProjection dbData;
}
}
У меня есть пакетный слушатель пропуска Spring, который должен обнаруживать сбои и обновлять мой компонент состояния соответственно:
@AllArgsConstructor
public class ConversionSkipListener implements SkipListener<DBProjection, Future<JsonMessage>> {
private ProcessStatus processStatus;
@Override
public void onSkipInRead(Throwable error) {}
@Override
public void onSkipInProcess(DBProjection dbData, Throwable error) {
processStatus.logUnexpectedFailure(new ProcessStatus.UnexpectedFailure(error, dbData));
}
@Override
public void onSkipInWrite(Future<JsonMessage> messageFuture, Throwable error) {
//This is getting called instead!! Even though the exception happened during processing :(
//But I have no access to the original DBProjection data here, and messageFuture.get() gives me null.
}
}
А потом я настроил свою работу так:
@Configuration
public class ConversionBatchJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private TaskExecutor processThreadPool;
@Bean
public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:100}") Integer chunkSize) {
return new SimpleCompletionPolicy(chunkSize);
}
@Bean
@StepScope
public ItemStreamReader<DbProjection> dbReader(
MyDomainRepository myDomainRepository,
@Value("#{jobParameters[pageSize]}") Integer pageSize,
@Value("#{jobParameters[limit]}") Integer limit) {
RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
myDomainRepositoryReader.setRepository(myDomainRepository);
myDomainRepositoryReader.setMethodName("findActiveDbDomains"); //A native query
myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
add("ACTIVE");
}});
myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
put("update_date", Sort.Direction.ASC);
}});
myDomainRepositoryReader.setPageSize(pageSize);
myDomainRepositoryReader.setMaxItemCount(limit);
// myDomainRepositoryReader.setSaveState(false); <== haven't figured out what this does yet
return myDomainRepositoryReader;
}
@Bean
@StepScope
public ItemProcessor<DbProjection, JsonMessage> dataConverter(DataRetrievalSerivice dataRetrievalService) {
//Sometimes throws exceptions when DB data is exceptionally weird, bad, or missing
return new DbProjectionToJsonMessageConverter(dataRetrievalService);
}
@Bean
@StepScope
public AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter(
ItemProcessor<DbProjection, JsonMessage> dataConverter) throws Exception {
AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter = new AsyncItemProcessor<>();
asyncDataConverter.setDelegate(dataConverter);
asyncDataConverter.setTaskExecutor(processThreadPool);
asyncDataConverter.afterPropertiesSet();
return asyncDataConverter;
}
@Bean
@StepScope
public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
return new JsonMessageWriter(publisherService);
}
@Bean
@StepScope
public AsyncItemWriter<JsonMessage> asyncJsonPublisher(ItemWriter<JsonMessage> jsonPublisher) throws Exception {
AsyncItemWriter<JsonMessage> asyncJsonPublisher = new AsyncItemWriter<>();
asyncJsonPublisher.setDelegate(jsonPublisher);
asyncJsonPublisher.afterPropertiesSet();
return asyncJsonPublisher;
}
@Bean
public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
ItemStreamReader<DbProjection> dbReader,
AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter,
AsyncItemWriter<JsonMessage> asyncJsonPublisher,
ProcessStatus processStatus,
@Value("${conversion.failure.limit:20}") int maximumFailures) {
return stepBuilderFactory.get("conversionProcess")
.<DbProjection, Future<JsonMessage>>chunk(processChunkSize)
.reader(dbReader)
.processor(asyncDataConverter)
.writer(asyncJsonPublisher)
.faultTolerant()
.skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
// ^ for now this returns true for everything until 20 failures
.listener(new ConversionSkipListener(processStatus))
.build();
}
@Bean
public Job conversionJob(Step conversionProcess) {
return jobBuilderFactory.get("conversionJob")
.start(conversionProcess)
.build();
}
}