Не фиксируйте смещение, когда в методе прослушивания Kafka возникает исключение - PullRequest
0 голосов
/ 10 октября 2019

У меня есть потребительское приложение kafka, где в методе слушателя аннотируется @Kafkalistener. Я предоставил названия тем и контейнер в качестве входных данных для этой аннотации. Этот метод имеет аргументы типа Message, а также Acknowledgement. Этот метод слушает любые сообщения kafka и, как только он получает, обрабатывает его и сохраняет в БД. После этого я фиксирую смещение вручную, используя методcknowledgement.acknowledge (). Теперь, если какое-либо исключение произойдет до этой строки, смещение не будет зафиксировано вручную. Но я видел, что для свойства контейнера ackOnError по умолчанию установлено значение true. Я хочу получить одно и то же сообщение и обрабатывать его до тех пор, пока оно не будет выполнено успешно, и не хочу перемещать указатель вперед, что в случае перемещения может привести к потере сообщений. Для этого я планирую использовать следующий подход.

1) Класс Listener будет реализовывать интерфейс ConsumerSeekAware. ConsumerSeekCallback будет вызываться экземпляром ThreadLocal, и с помощью этого будет вызываться метод поиска для того же сообщения, которое не удалось обработать.

2) Установите max-poll-records равным 1.

Пожалуйста, помогите мне решить, является ли это правильным подходом. Я не использую Spring kafka 2.x, а скорее 1.2.x. Кроме того, в настоящее время я установил для свойства auto.commit.offset значение false, а для ackMode - значение manual.

1 Ответ

0 голосов
/ 10 октября 2019

1.2.x больше не поддерживается;вы должны обновить как минимум до 1.3.10 как можно скорее;благодаря KIP-62 она имеет гораздо более простую модель многопоточности.

Текущая версия 2.3.0.

2.0.1 представляет SeekToCurrentErrorHandler именно для этой цели.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...