Spring Kafka SeekToCurrentErrorHandler Узнайте, какая запись не удалась - PullRequest
0 голосов
/ 27 августа 2018

Я реализовал потребителя Kafka с KafkaHandler.Предполагается, что мой потребитель использует события, а затем отправляет запрос REST какой-то другой службе для каждого события.Я хочу повторить попытку, только если эта служба REST не работает.В противном случае я могу проигнорировать сбойное событие.

Моя фабрика контейнеров настроена следующим образом:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
  kafkaListenerContainerFactory() {

  ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
    new ConcurrentKafkaListenerContainerFactory<>();

  factory.setConsumerFactory(consumerFactory());
  factory.setStatefulRetry(true);
  factory.setRetryTemplate(retryTemplate());
  factory.setConcurrency(3);

  ContainerProperties containerProperties = factory.getContainerProperties();
  containerProperties.setAckOnError(false);
  containerProperties.setAckMode(AckMode.RECORD);
  containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());

  return factory;
}

Я использую ExceptionClassifierRetryPolicy для установки исключений и соответствующих политик повторных попыток.

Все выглядит хорошо с повторной попыткой.Он повторяет попытку, когда я получаю ConnectException, и игнорирует, когда я получаю IllegalArgumentException.

Однако, в сценарии IllegalArgumentException, SeekToCurrentErrorHandler стремится вернуться к необработанному смещению (потому что он ищет обратно необработанноесообщения, в том числе сообщение об ошибке), которое заканчивается немедленной повторной попыткой сообщения об ошибке.Потребитель постоянно перемещается назад и вперед и повторяет попытки миллион раз.

Если бы у меня была возможность понять, какая запись не удалась в SeekToCurrentErrorHandler, я бы создал пользовательскую реализацию SeekToCurrentErrorHandler, чтобы проверить, нет ли сообщения об ошибке.повторяется или нет (с помощью поля thrownException).Если это невозможно повторить, я бы удалил его из списка records для поиска.

Есть идеи о том, как добиться этой функциональности?

Примечание: enable.auto.commit установленоfalse, auto.offset.reset - earliest.

Спасибо!

1 Ответ

0 голосов
/ 27 августа 2018

Существует FailedRecordTracker, начиная с Spring для Apache Kafka 2.2 (пока не выпущено):

https://docs.spring.io/spring-kafka/docs/2.2.0.M2/reference/html/whats-new-part.html#_listener_container_changes

Начиная с версии 2.2, SeekToCurrentErrorHandlerТеперь можно восстановить (пропустить) запись, которая продолжает сбой.По умолчанию, после 10 сбоев, запись о сбое будет записана (ОШИБКА).Вы можете настроить обработчик с помощью пользовательского восстановителя (BiConsumer) и / или макс. Сбоев.

SeekToCurrentErrorHandler errorHandler =
    new SeekToCurrentErrorHandler((record, exception) -> {
          // recover after 3 failures - e.g. send to a dead-letter topic
          }, 3);

Итак, вам просто нужно скопировать / вставить FailedRecordTracker и SeekToCurrentErrorHandlerИсходный код из master в ваш проект, и вы получите функциональность, которую вы ищете:

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...