Раздел резюме KafkaConsumer не может продолжать получать незафиксированные сообщения - PullRequest
0 голосов
/ 12 ноября 2018

Я использую одну тему, один раздел, одного потребителя, версия клиента Kafka - 0.10.

Я получил два разных результата:

  1. Если я * 1007Сначала * paused section, затем для создания сообщения и вызова метода resume .KafkaConsumer может успешно опросить незафиксированное сообщение.

  2. Но если я сначала создал сообщение и не зафиксировал его смещение, то через несколько секунд приостановить раздел, чтобы вызвать метод возобновления.KafkaConsumer не получит незафиксированное сообщение.Я проверил это на сервере Kafka, используя kafka-consumer-groups.sh, он показывает LOG-END-OFFSET minus CURRENT-OFFSET = LAG = 1.

Я пытался выяснить это в течение двух дней, я повторял такие тесты много раз, результаты всегда такие.Мне нужно предложение, или кто-то может сказать мне оригинальный механизм его Кафки.

1 Ответ

0 голосов
/ 13 ноября 2018

Для вашего наблюдения # 2, если вы перезапустите приложение, оно предоставит вам все записи со смещения без фиксации, т. Е. Отсутствующей записи, и если ваш потребитель снова не подтвердит фиксацию, оно будет отправлено снова, когда приложение зарегистрирует потребителя. с Кафкой при перезагрузке. Ожидается.

Предполагается, что вы используете consumer.poll(), который создает интерфейс гибридной потоковой передачи, то есть, если он накапливает данные, поступающие в Kafka для упомянутого duration, и предоставляет их потребителю для обработки после завершения продолжительности. Это непрерывное накопление происходит в бэкэнде и не зависит от того, зафиксировали ли вы смещение или нет.

KafkaConsumer

Позиция потребителя дает смещение следующей записи, которая будет выдан. Это будет больше, чем максимальное смещение потребитель видел в этом разделе. Это автоматически продвигает каждый время, когда потребитель получает сообщения при вызове на опрос (long).

...