Метод, аннотированный @KafkaListener, не распространяет обнаруженное исключение. Невозможно вызвать мою конфигурацию повторных попыток из-за этого - PullRequest
1 голос
/ 15 октября 2019

У меня есть метод прослушивания Kafka, аннотированный @kafkaListener. Он принимает аргумент типа Message, а также Acknowledgement. Я обрабатываю полученное сообщение и выполняю ручные коммиты с помощью acceptledgement.acknowledge (). Я установил шаблон повтора для контейнера. Политика повторных попыток определена для конкретного исключения. Для этой цели я создал свой собственный класс RetryPloicy и расширил его с помощью ExceptionClassifierRetryPolicy. В этом классе, в зависимости от полученного исключения, я возвращаю AlwaysRetryPolicy, NeverRetryPolicy и SimpleRetryPolicy. Проблема, с которой я сталкиваюсь, заключается в том, что, когда возникает DataAccessException во время обработки сообщения в методе слушателя, я хотел бы повторить попытку навсегда, и я настроил политику повторения соответствующим образом, но метод слушателя всегда выдает исключение ListenerExecutionFailedException вместо обнаруженнойисключение, которое было выброшено ниже стека вверх до метода слушателя из методов обработки сообщений выше. Поскольку это исключение выдается слушателем, моя конфигурация повтора не работает должным образом.

Пример кода приведен ниже:

    @KafkaListener(topics = "topicName", containerFactory = "kafkaListenerContainerFactory")
    public void listenToKafkaTopic(@Payload Message<SomeAvroType> message, Acknowledgement ack){
      SomeAvroType type = message.getPayLoad();
      type.processIncomingMessage();
      ack.acknowledge();
   }

Конфигурация политики повторных попыток

    @component 
    public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
     {
      @PostConstruct
      public void init(){
        final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(3);

        this.setExceptionClassifier( new Classifier<Throwable, RetryPolicy>()
          {
            @Override
            public RetryPolicy classify( Throwable classifiable ){

            // Always Retry when instanceOf TransientDataAccessException
            if ( classifiable instanceof TransientDataAccessException)
            {
                return new AlwaysRetryPolicy;
            }
            else if(classifiable instanceOf SomeOtherException){

              return simpleRetryPolicy; 
           }

            // Do not retry for other exceptions
            return new NeverRetryPolicy();
          }
       } );
     }
   }

Я использую большую часть автоматической конфигурации, предоставляемой для контейнера, и, следовательно, я автоматически подключаю ConcurrentKafkaListenerContainerFactoryв моем классе Retry Config.

    @configuration
    public class RetryConfig{


     @Bean
     public RetryTemplate retryTemplate(@Autowired @Qualifier("kafkaListenerContainerFactory")ConcurrentKafkaListenerContainerFactory factory;){

       RetryTemplate retryTemplate = new RetryTemplate();
       retryTemplate.setRetryPolicy(new MyRetryPolicy());
       FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
       fixedBackOffPolicy.setBackOffPeriod(1000l);
       retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

       factory.setRetryTemplate(retryTemplate);
       factory.setAckOnError(false);
       factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset)

      }
    }

Когда я запускаю это в режиме отладки и выкидываю исключение TransientDataAccessException в processIncomingMessage (), я ожидаю повторения всегда, но метод слушателя не выдает распространенное исключение, ноон генерирует исключение ListenerExecutionFailedException, а причиной (e.getCause ()) является TransientDataAccessException. Поэтому политика повторных попыток всегда оценивается как NeverretryPloicy. Есть ли способ, при котором в слушателе выдается распространенное исключение, чтобы моя конфигурация повторной попытки выполнялась правильно?

1 Ответ

0 голосов
/ 15 октября 2019

См. BinaryExceptionClassifier и его свойство traverseCauses.

/**
 * Create a binary exception classifier.
 * @param defaultValue the default value to use
 * @param typeMap the map of types to classify
 * @param traverseCauses if true, throwable's causes will be inspected to find
 * non-default class
 */
public BinaryExceptionClassifier(Map<Class<? extends Throwable>, Boolean> typeMap, boolean defaultValue,
        boolean traverseCauses) {
    super(typeMap, defaultValue);
    this.traverseCauses = traverseCauses;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...