Kafka Consumer с ограниченным количеством повторов при обработке сообщений - PullRequest
0 голосов
/ 28 апреля 2018

Мне трудно найти простые шаблоны для обработки исключений в теме кафки. Сценарий таков: у потребителя я вызываю внешнюю услугу. Если служба недоступна, я хочу повторить попытку несколько раз, а затем прекратить потребление.

Самый простой шаблон - блокирующий синхронный способ работы с ним, что-то вроде этого в java:

ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
    boolean processed=false;
    int count=0;
    while (!processed) {
        try {
            callService(..); 
        } catch (Exception e) {
            if (count++ < 3) {
                Thread.sleep(5000);
                continue;
            } else throw new RuntimeException();
        }
    }
}

Тем не менее, я чувствую, что должен быть более простой подход (без использования сторонних библиотек) и тот, который избегает блокировки потока.

Кажется, это обычное дело, которое мы хотели бы иметь, но я не смог найти простой пример для этого паттерна.

1 Ответ

0 голосов
/ 28 апреля 2018

Кафка не предоставляет такой механизм повторного запуска из коробки. С опытом использования RabbitMQ, где MQ обеспечивает повторный обмен. Эти биржи называются Dead-Letter-Exchanges в RabbitMQ.

https://www.rabbitmq.com/dlx.html

Вы можете применить тот же шаблон в случае кафки.

При сбое обработки сообщения мы можем опубликовать копию сообщения в другой теме и дождаться следующего сообщения. Давайте назовем новую тему «retry_topic». Потребитель 101 retry_topic ’получит сообщение от Kafka, а затем подождет некоторое заранее определенное время, например, один час, прежде чем начинать обработку сообщения. Таким образом, мы можем отложить следующие попытки обработки сообщений без какого-либо влияния на потребителя «main_topic». Если обработка в приемнике «retry_topic» не удалась, мы просто должны отказаться и сохранить сообщение в «fail_topic» для дальнейшей ручной обработки этой проблемы. Код пользователя main_topic может выглядеть следующим образом:

Отправка сообщения в retry_topic при ошибке / исключении

void consumeMainTopicWithPostponedRetry() {
    while (true) {
        Message message = takeNextMessage("main_topic");
        try {
            process(message);
        } catch (Exception ex) {
            publishTo("retry_topic");
            LOGGER.warn("Message processing failure. Will try once again in the future.", ex);
        }
    }
}

Потребитель темы повторных попыток

void consumeRetryTopic() {
    while (true) {
        Message message = takeNextMessage("retry_topic");
        try {
            process(message);
            waitSomeLongerTime();
        } catch (Exception ex) {
            publishTo("failed_topic");
            LOGGER.warn("Message processing failure. Will skip it.", ex);
        }
    }
}

Приведенная выше стратегия и примеры взяты из ссылки ниже. Весь кредит достается владельцу поста в блоге. https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a

О неблокирующем способе выполнения выше можно понять, прочитав весь пост в блоге. Надеюсь, это поможет.

...