Kafka Consumer перезаписывает смещения непринятых сообщений - PullRequest
0 голосов
/ 29 октября 2019

Обзор: у меня есть лямбда-установка с Kafka Consumer, которая опрашивает сообщения из кластера Kafka и индексирует их в домен Elasticsearch. Лямбда вызывается каждую минуту, что означает, что идентификатор клиента Kafka Consumer изменяется, но идентификатор группы Kafka всегда остается неизменным. Важно отметить, что у потребителя отключено автоматическое принятие, и мы вручную фиксируем смещения после успешной индексации сообщений. В случае неудачной индексации наш код просто перехватывает ошибку и регистрирует ее и не фиксирует эти сообщения.

Однако я заметил, что когда происходит индексация, когда потребитель не передает неиндексированные сообщенияэти сообщения также не отображаются в последующих опросах. Любые сообщения, опубликованные после неиндексированных сообщений, занимают смещение незафиксированных сообщений.

Чтобы воссоздать этот сценарий, я провел небольшой тест и сохранил журналы: h https://pastebin.com/6aDzbvD4

ИзВ журналах видно, что одна запись размером 415 КБ была опрошена в 16:06 и не была проиндексирована в ES. В последующих опросах после 16:06 незафиксированные сообщения нигде не видны. В 16:11 я опубликовал еще одно сообщение размером 3 КБ для Kafka, которое было успешно опрошено потребителем, хотя и с тем же смещением, что и предыдущее незафиксированное сообщение. Это новое сообщение было проиндексировано и зафиксировано успешно.

Я новичок в Kafka и пытаюсь понять, почему это могло произойти. У меня также есть значение конфигурации auto.offset.reset, равное «самым ранним», поэтому потребитель должен иметь возможность видеть самые ранние незафиксированные сообщения. Буду признателен за любую помощь по этому поводу!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...