Как выполнить политику «Всегда повторять» / «Пользовательскую политику повторов» для потребителя Kafka без изменения баланса в группе - PullRequest
0 голосов
/ 17 октября 2019

Я пытаюсь написать стойкого потребителя Кафки. Если при обработке сообщения в методе слушателя возникает исключение, я бы хотел повторить попытку. Я хотел бы повторить несколько раз для определенных исключений, всегда для определенных исключений и никогда для остальных. Я прочитал документы Spring, касающиеся SeekToCurrentErrorHandler, но не уверен на 100%, как его реализовать.

Я подклассифицировал ExceptionClassifierRetryPolicy и возвращаю соответствующую Политику повтора, основанную на исключениях, возникающих в методе Listener.

Я создал RetryTemplate и установил его RetryPolicy с моей собственной реализацией в подклассе.

Я установил retryTemplate для контейнера Kafka. Я установил обработчик ошибок как новый SeekToCurrentHandler, а также установил свойство повторения с сохранением состояния в true.

Метод прослушивателя

@KafkaListener(topics = "topicName", containerFactory = "containerFactory")
   public void listenToKafkaTopic(@Payload Message<SomeAvroGeneratedClass> message, Acknowledgement ack){
      SomeAvroGeneratedClass obj = message.getPayLoad();
      type.processIncomingMessage();
      ack.acknowledge();
   }

Класс настраиваемой политики повтора

 @Component
 public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
   {
      @PostConstruct
       public void init(){
             final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
             simpleRetryPolicy.setMaxAttempts(8);

     this.setExceptionClassifier( classifiable ->
           {
        // Always Retry when instanceOf   TransientDataAccessException
       if( classifiable.getCause() instanceof TransientDataAccessException)
            {
               return new AlwaysRetryPolicy();                                         

            }
      else if(classifiable.getCause() instanceOf NonTransientDataAccessException)
           {
              return new NeverRetryPolicy();
           }
          else
            {
            return simpleRetryPolicy;
            }

      } );
 }}

Повторить попытку шаблона и контейнера

@Configuration
public class RetryConfig{


 @Bean
 public RetryTemplate retryTemplate(@Autowired ConcurrentKafkaListenerContainerFactory factory){

   RetryTemplate retryTemplate = new RetryTemplate();
   retryTemplate.setRetryPolicy(new MyRetryPolicy());
   FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
   fixedBackOffPolicy.setBackOffPeriod(2000l);
   retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

   factory.setRetryTemplate(retryTemplate);
   factory.setAckOnError(false);
   factory.setErrorHandler(new SeekToCurrentErrorHandler());
   factory.setStateFulRetry(true);
   factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset
   );

  }
}

Свойства прослушивателя:

1) AckMode = Manual 2) auto.offset.commit = false

Вопрос:

1) Могу ли я достичь своей логики повторения, определенной в MyRetryPolicy, без восстановления баланса потребителя при возврате AlwaysRetryPolicy? Если нет, пожалуйста, направьте меня по правильному пути.

2) Правильный ли мой подход в использовании обработчика ошибок и повторных попыток?

1 Ответ

0 голосов
/ 17 октября 2019

Повтор с сохранением состояния в сочетании с SeekToCurrentErrorHandler является правильным подходом.

Однако, если вы используете последнюю версию (2.2 или выше), обработчик ошибок откажется после 10 попыток;Вы можете сделать эту бесконечность, установив maxFailures в -1 (2.2) или backOff в Long.MAX_VALUE (2.3 или выше).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...