Клиентский раздел Kafka обрабатывает незафиксированные сообщения при приостановке и возобновлении - PullRequest
0 голосов
/ 12 марта 2019

Я ищу возможность повторной обработки незафиксированных сообщений после приостановки и возобновления работы пользователя kafka. Может ли кто-нибудь посоветовать мне лучший подход для этого? Выберите паузу / возобновление вместо остановки и перезапуска потребителя, чтобы избежать перебалансировки как части обработки ошибок в приложении.

Перебалансировать, если он работает нормально, как мы и ожидали, с паузой / возобновлением контейнера, но единственное, что он не обрабатывает незафиксированное сообщение после возобновления. Можно ли как-то обработать эти незафиксированные сообщения?

Другая проблема, с которой сталкиваются при остановке и перезапуске, заключается в том, что, как только потребитель запускается после остановки в сценарии ошибки, потребитель может получать незафиксированные сообщения и повторять его, не увеличивая смещение, как ожидалось. Пока этот процесс продолжается, если я опубликую любое другое сообщение, которое может быть успешно обработано, то после того, как оно не получит предыдущее сообщение о сбое / незафиксированное сообщение, будет искать сообщения из следующего смещения в опросе. Как мы можем справиться с этим сценарием?

Если опция поиска к текущему смещению является опцией, может ли кто-нибудь предоставить какой-либо рабочий пример с реализацией поиска к текущему смещению в этом сценарии?

Вкратце, предоставленная конфигурация:

-> обработка отдельной записи вместо пакетной обработки. -> Включить Auto Commit Config, предоставленный как false и следуя режиму MANUAL_IMMEDIATE ack. -> Использование kafkaListenerContainerFactory с параллелизмом, определенным как 1.

Очень ценю предложения по этому вопросу!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...