когда использовать RecoveryCallback против KafkaListenerErrorHandler - PullRequest
0 голосов
/ 28 октября 2019

Я пытаюсь понять, когда я должен использовать org.springframework.retry.RecoveryCallback и org.springframework.kafka.listener.KafkaListenerErrorHandler?

На сегодняшний день я использую класс (реализует org.springframework.retry.RecoveryCallback) для регистрации сообщения об ошибке и отправки сообщения в DLT, и оно работает. Для отправки сообщения в DLT я использую Spring KafkaTemplate, а затем натолкнулся на KafkaListenerErrorHandler и DeadLetterPublishingRecoverer. Теперь, можете ли вы мне предложить, как я должен использовать KafkaListenerErrorHandler и DeadLetterPublishingRecoverer? Может ли это заменить RecoveryCallback?

Вот мой текущий код kafkaListenerContainerFactory

@ Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory () {

ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(primaryConsumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(recoveryCallback);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setConcurrency(1);  
factory.getContainerProperties().setMissingTopicsFatal(false);
return factory;   }
* 1011

1 Ответ

0 голосов
/ 28 октября 2019

Если он работает так, как вы хотите, зачем его менять?

Есть несколько слоев, и вы можете выбрать, какой из них будет обрабатывать ошибки, в зависимости от ваших потребностей.

  • KafkaListenerErrorHandler будет вызываться для каждой попытки доставки в повторной попытке, поэтому вы обычно не будете использовать ее с повторной попыткой.
  • Повторная попытка RecoveryCallback вызывается после исчерпания повторных попыток (или немедленно, если вы классифицировалиисключение как не повторяемое).
  • ErrorHandler - находится в контейнере и вызывается, если какой-либо слушатель выдает исключение, а не только @KafkaListener s.

С последними версиямиФреймворк, в котором вы можете полностью заменить повтор уровня прослушивания на SeekToCurrentErrorHandler, настроенный на DeadLetterPublishingRecoverer и BackOff.

. DeadLetterPublishingRecoverer предназначен для использования в обработчике ошибок контейнера, так как для него требуется необработанныйConsumerRecord<?, ?>.

KafkaListenerErrorHandler имеет доступ только к пружинным сообщениям Message<?>, преобразованным из ConsumerRecord<?, ?>.

.
...