У меня есть потребитель, который подключается к темам Kafka. Каждая топика Kafka c имеет 1 раздел. Я перемещаю указатель Кафки на основе стратегии потребления. Для стратегии LATEST
то же самое делается следующим образом:
Извлечение конечных смещений:
Map<TopicPartition, Long> topicEndOffsets = consumer.endOffsets(partitions)
logger.debug(); // log endoffset for every topic
После завершения регистрации я перемещаю смещение topi c в конец, используя seekToEnd
Метод API.
У меня 5 тем, этот лог c отлично работает для всех тем, кроме 1 из топи c, 1 топи c возвращает неправильное смещение в альтернативном прогоне тестового примера. Странно то, что Кафка всегда возвращает значение одно и то же неверное значение (предположим, 798).
Еще несколько замечаний, испытаний и ошибок, которые я сделал:
Предположим, что текущее завершение смещение равно 1000, производитель выдает 2 сообщения. потребитель потребляет эти два сообщения. смещение переходит на 1002. Теперь я перезапускаю тестовый пример потребителя, снова Kafka endOffset () возвращает 798.
В альтернативном неверном значении конечного смещения (798). seekToEnd()
перемещает указатель на 798 вместо 1002.
consumer#position()
API возвращает адрес как 1002, так как конечное смещение отстает на 798. Я попытался переместить смещение, вызвав seek(1002)
API, но вместо этого Kafka переместил указатель конечного смещения на 798.
Я использую клиент Kafka версии 2.0.1. Я мог бы воспроизвести поведение с версией 2.3.0.
Я мог видеть другой вопрос, описывающий то же самое поведение. Согласованность конечных потребителей kafka endOffsets
Поскольку такое поведение повторяется только для 1 из 5 тем, есть ли что-то, что мне не хватает?