Ниже приведены конфигурации
@Bean
@Autowired
public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> kafkaListContFactory(@Qualifier("retryTemplate") RetryTemplate retryTemplate, @Qualifier("batchErrorHandler") ErrorHandler errorHandler, @Qualifier("batchErrorHandler") BatchErrorHandler batchErrorHandler) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(errorHandler);
factory.getContainerProperties().setAckOnError(false);
factory.setStatefulRetry(true);
factory.setRetryTemplate(retryTemplate);
}
**Retry config**
@Bean("retryTemplate")
public RetryTemplate retryTemplate() {
final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(delay);
backOffPolicy.setMultiplier(multiplier);
backOffPolicy.setMaxInterval(20000);
final RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(new AlwaysRetryPolicy());
template.setBackOffPolicy(backOffPolicy);
return template;
}
SeekToCurrentErrorHandler config
I do not want to recover and try to retry till it succeeds so I have given maxAttempts to -1
@Bean("errorHandler")
public ErrorHandler errorHandler() {
final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler((r, t) -> {
if (t != null && (t instanceof RetryServiceException || t.getCause() instanceof RetryServiceException)) {
logger.error("SeekToCurrentErrorHandler recoverer failure", t.getMessage());
throw new RetryServiceException("SeekToCurrentErrorHandler recoverer failure");
}
}, -1);
return handler;
}
И, наконец, я подтверждаю в методе @KafkaListener, когда исключение не происходит.
У меня вопрос, настроил ли я -1 как максимум попыток, и мой Обработчик ошибок позаботится о повторных попытках. Нужно ли мне retryTemplate? Но повторных попыток не происходит в течение бесконечного времени, и проблема в том, что если я получаю пакетную запись, я обрабатываю те же сообщения, если одно из сообщений не удается в опросе, все они будут обработаны повторно.
Мне нужно использовать batchErrorHandler и реализовать экспоненциальную стратегию отката, чтобы повторные попытки выполнялись с сохранением состояния и избегали повторной обработки одних и тех же успешных сообщений. Может ли кто-нибудь помочь с вышеупомянутой проблемой.
И мне нужно избежать перебалансировки раздела из-за неправильного использования max.poll.interval.ms