Обзор: у меня есть лямбда-установка с Kafka Consumer, которая опрашивает сообщения из кластера Kafka и индексирует их в домен Elasticsearch. Лямбда вызывается каждую минуту, что означает, что идентификатор клиента Kafka Consumer изменяется, но идентификатор группы Kafka всегда остается неизменным. Важно отметить, что у потребителя отключено автоматическое принятие, и мы вручную фиксируем смещения после успешной индексации сообщений. В случае неудачной индексации наш код просто перехватывает ошибку и регистрирует ее и не фиксирует эти сообщения.
Однако я заметил, что когда происходит индексация, когда потребитель не передает неиндексированные сообщенияэти сообщения также не отображаются в последующих опросах. Любые сообщения, опубликованные после неиндексированных сообщений, занимают смещение незафиксированных сообщений.
Чтобы воссоздать этот сценарий, я провел небольшой тест и сохранил журналы: h
https://pastebin.com/6aDzbvD4
ИзВ журналах видно, что одна запись размером 415 КБ была опрошена в 16:06 и не была проиндексирована в ES. В последующих опросах после 16:06 незафиксированные сообщения нигде не видны. В 16:11 я опубликовал еще одно сообщение размером 3 КБ для Kafka, которое было успешно опрошено потребителем, хотя и с тем же смещением, что и предыдущее незафиксированное сообщение. Это новое сообщение было проиндексировано и зафиксировано успешно.
Я новичок в Kafka и пытаюсь понять, почему это могло произойти. У меня также есть значение конфигурации auto.offset.reset, равное «самым ранним», поэтому потребитель должен иметь возможность видеть самые ранние незафиксированные сообщения. Буду признателен за любую помощь по этому поводу!