У меня есть метод прослушивания Kafka, аннотированный @kafkaListener. Он принимает аргумент типа Message, а также Acknowledgement. Я обрабатываю полученное сообщение и выполняю ручные коммиты с помощью acceptledgement.acknowledge (). Я установил шаблон повтора для контейнера. Политика повторных попыток определена для конкретного исключения. Для этой цели я создал свой собственный класс RetryPloicy и расширил его с помощью ExceptionClassifierRetryPolicy. В этом классе, в зависимости от полученного исключения, я возвращаю AlwaysRetryPolicy, NeverRetryPolicy и SimpleRetryPolicy. Проблема, с которой я сталкиваюсь, заключается в том, что, когда возникает DataAccessException во время обработки сообщения в методе слушателя, я хотел бы повторить попытку навсегда, и я настроил политику повторения соответствующим образом, но метод слушателя всегда выдает исключение ListenerExecutionFailedException вместо обнаруженнойисключение, которое было выброшено ниже стека вверх до метода слушателя из методов обработки сообщений выше. Поскольку это исключение выдается слушателем, моя конфигурация повтора не работает должным образом.
Пример кода приведен ниже:
@KafkaListener(topics = "topicName", containerFactory = "kafkaListenerContainerFactory")
public void listenToKafkaTopic(@Payload Message<SomeAvroType> message, Acknowledgement ack){
SomeAvroType type = message.getPayLoad();
type.processIncomingMessage();
ack.acknowledge();
}
Конфигурация политики повторных попыток
@component
public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
{
@PostConstruct
public void init(){
final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(3);
this.setExceptionClassifier( new Classifier<Throwable, RetryPolicy>()
{
@Override
public RetryPolicy classify( Throwable classifiable ){
// Always Retry when instanceOf TransientDataAccessException
if ( classifiable instanceof TransientDataAccessException)
{
return new AlwaysRetryPolicy;
}
else if(classifiable instanceOf SomeOtherException){
return simpleRetryPolicy;
}
// Do not retry for other exceptions
return new NeverRetryPolicy();
}
} );
}
}
Я использую большую часть автоматической конфигурации, предоставляемой для контейнера, и, следовательно, я автоматически подключаю ConcurrentKafkaListenerContainerFactoryв моем классе Retry Config.
@configuration
public class RetryConfig{
@Bean
public RetryTemplate retryTemplate(@Autowired @Qualifier("kafkaListenerContainerFactory")ConcurrentKafkaListenerContainerFactory factory;){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new MyRetryPolicy());
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
fixedBackOffPolicy.setBackOffPeriod(1000l);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
factory.setRetryTemplate(retryTemplate);
factory.setAckOnError(false);
factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset)
}
}
Когда я запускаю это в режиме отладки и выкидываю исключение TransientDataAccessException в processIncomingMessage (), я ожидаю повторения всегда, но метод слушателя не выдает распространенное исключение, ноон генерирует исключение ListenerExecutionFailedException, а причиной (e.getCause ()) является TransientDataAccessException. Поэтому политика повторных попыток всегда оценивается как NeverretryPloicy. Есть ли способ, при котором в слушателе выдается распространенное исключение, чтобы моя конфигурация повторной попытки выполнялась правильно?