Кафка не предоставляет такой механизм повторного запуска из коробки. С опытом использования RabbitMQ, где MQ обеспечивает повторный обмен. Эти биржи называются Dead-Letter-Exchanges
в RabbitMQ.
https://www.rabbitmq.com/dlx.html
Вы можете применить тот же шаблон в случае кафки.
При сбое обработки сообщения мы можем опубликовать копию сообщения в другой теме и дождаться следующего сообщения. Давайте назовем новую тему «retry_topic
». Потребитель 101 retry_topic
’получит сообщение от Kafka, а затем подождет некоторое заранее определенное время, например, один час, прежде чем начинать обработку сообщения. Таким образом, мы можем отложить следующие попытки обработки сообщений без какого-либо влияния на потребителя «main_topic». Если обработка в приемнике «retry_topic» не удалась, мы просто должны отказаться и сохранить сообщение в «fail_topic» для дальнейшей ручной обработки этой проблемы. Код пользователя main_topic может выглядеть следующим образом:
Отправка сообщения в retry_topic при ошибке / исключении
void consumeMainTopicWithPostponedRetry() {
while (true) {
Message message = takeNextMessage("main_topic");
try {
process(message);
} catch (Exception ex) {
publishTo("retry_topic");
LOGGER.warn("Message processing failure. Will try once again in the future.", ex);
}
}
}
Потребитель темы повторных попыток
void consumeRetryTopic() {
while (true) {
Message message = takeNextMessage("retry_topic");
try {
process(message);
waitSomeLongerTime();
} catch (Exception ex) {
publishTo("failed_topic");
LOGGER.warn("Message processing failure. Will skip it.", ex);
}
}
}
Приведенная выше стратегия и примеры взяты из ссылки ниже. Весь кредит достается владельцу поста в блоге.
https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a
О неблокирующем способе выполнения выше можно понять, прочитав весь пост в блоге. Надеюсь, это поможет.