Обработка сообщений на мертвой букве topi c с помощью Spring Kafka - PullRequest
0 голосов
/ 17 июня 2020

В настоящее время у нас есть конфигурация Dead Letter Topi c (DLT) с использованием Spring Kafka (в приложении Spring Boot). Мы используем DeadLetterPublishingRecoverer в SeekToCurrentErrorHandler. Последнему мы присвоили ConcurrentKafkaListenerContainerFactory.

при обработке наших первых сообщений; из-за глупой ошибки в нашем сервисе мы получили несколько NullPointerException исключений, и 20% сообщений попали в DLT (что является ожидаемым поведением и идеально подходит для нас).

Мы исправили ошибку ошибка, но теперь мы хотим снова обработать эти 20% сообщений. Возможности, которые мы видим:

  • написать небольшое приложение, которое копирует сообщения из DLT в исходный topi c
  • добавить второй @KafkaEventListener в наше приложение, которое читает из DLT

Решение 2 - мое предпочтительное решение, поскольку возвращение его к исходному topi c также подразумевает, что другие группы потребителей снова получат сообщение (обычно должно быть в порядке, так как все наши услуги идемпотентны). ).

Мне было интересно, есть ли другие передовые методы решения этой проблемы. Если нет, мне также было интересно, как я могу динамически активировать / деактивировать @KafkaEventListener для DLT (поскольку вы не хотите, чтобы этот слушатель постоянно был включен)

Спасибо за ваш отзыв!

Йохен

1 Ответ

2 голосов
/ 17 июня 2020

Решение номер 2 мне кажется идеальным.

Мне также было интересно, как я могу динамически активировать / деактивировать @KafkaEventListener для DLT (поскольку вы не хотите, чтобы этот слушатель все время вверх)

Вы можете использовать свойство @KafkaListener autoStartup, введенное с 2.2.

@Autowired
private KafkaListenerEndpointRegistry registry;

@KafkaListener(id = "123", topics = "XXX.DLT", autoStartup = "true"){
    //do your processing
}

//After you are done
registry.getListenerContainer("123").stop();
...