У меня есть потребительское приложение kafka, где в методе слушателя аннотируется @Kafkalistener. Я предоставил названия тем и контейнер в качестве входных данных для этой аннотации. Этот метод имеет аргументы типа Message, а также Acknowledgement. Этот метод слушает любые сообщения kafka и, как только он получает, обрабатывает его и сохраняет в БД. После этого я фиксирую смещение вручную, используя методcknowledgement.acknowledge (). Теперь, если какое-либо исключение произойдет до этой строки, смещение не будет зафиксировано вручную. Но я видел, что для свойства контейнера ackOnError по умолчанию установлено значение true. Я хочу получить одно и то же сообщение и обрабатывать его до тех пор, пока оно не будет выполнено успешно, и не хочу перемещать указатель вперед, что в случае перемещения может привести к потере сообщений. Для этого я планирую использовать следующий подход.
1) Класс Listener будет реализовывать интерфейс ConsumerSeekAware. ConsumerSeekCallback будет вызываться экземпляром ThreadLocal, и с помощью этого будет вызываться метод поиска для того же сообщения, которое не удалось обработать.
2) Установите max-poll-records равным 1.
Пожалуйста, помогите мне решить, является ли это правильным подходом. Я не использую Spring kafka 2.x, а скорее 1.2.x. Кроме того, в настоящее время я установил для свойства auto.commit.offset значение false, а для ackMode - значение manual.