Я ищу возможность повторной обработки незафиксированных сообщений после приостановки и возобновления работы пользователя kafka. Может ли кто-нибудь посоветовать мне лучший подход для этого? Выберите паузу / возобновление вместо остановки и перезапуска потребителя, чтобы избежать перебалансировки как части обработки ошибок в приложении.
Перебалансировать, если он работает нормально, как мы и ожидали, с паузой / возобновлением контейнера, но единственное, что он не обрабатывает незафиксированное сообщение после возобновления. Можно ли как-то обработать эти незафиксированные сообщения?
Другая проблема, с которой сталкиваются при остановке и перезапуске, заключается в том, что, как только потребитель запускается после остановки в сценарии ошибки, потребитель может получать незафиксированные сообщения и повторять его, не увеличивая смещение, как ожидалось. Пока этот процесс продолжается, если я опубликую любое другое сообщение, которое может быть успешно обработано, то после того, как оно не получит предыдущее сообщение о сбое / незафиксированное сообщение, будет искать сообщения из следующего смещения в опросе. Как мы можем справиться с этим сценарием?
Если опция поиска к текущему смещению является опцией, может ли кто-нибудь предоставить какой-либо рабочий пример с реализацией поиска к текущему смещению в этом сценарии?
Вкратце, предоставленная конфигурация:
-> обработка отдельной записи вместо пакетной обработки.
-> Включить Auto Commit Config, предоставленный как false и следуя режиму MANUAL_IMMEDIATE ack.
-> Использование kafkaListenerContainerFactory с параллелизмом, определенным как 1.
Очень ценю предложения по этому вопросу!