Повторная обработка / чтение записей / сообщений Kafka - Какова цель сброса смещения группы потребителей? - PullRequest
2 голосов
/ 27 июня 2019

В моей теме кафки всего 10 записей / сообщений и 2 раздела по 5 сообщений в каждой. В моей группе потребителей 2 потребителя, и каждый из них уже прочитал 5 сообщений из назначенного им раздела соответственно. Теперь я хочу заново обработать / прочитать сообщения из моей темы от начала / начала (смещение 0).

Я остановил своих потребителей kafka и выполнил следующую команду, чтобы сбросить смещение группы потребителей на 0.

./kafka-consumer-groups.sh --group cg1 --reset-offsets --to-offset 0 --topic t1 --execute --bootstrap-server "..."

Я ожидал, что, как только я перезапущу своих потребителей кафки, они начнут читать записи со смещения 0, то есть с начала, но этого не произошло, и они опрашивают свою последнюю позицию, то есть смещение 5. Почему это так? Затем я должен заставить каждого из моих потребителей явно стремиться сместить 0 (начало), чтобы заново обработать / прочитать записи с самого начала. А в последующих циклах тестирования я даже не запускал команду выше для сброса смещения для группы потребителей kafka.

У меня вопрос: если я должен заставить своих потребителей явно стремиться начать заставлять их заново обрабатывать / читать сообщения, то какова цель сброса смещения группы потребителей kafka?

1 Ответ

4 голосов
/ 30 июня 2019

Обработка потребительских смещений Kafka немного сложнее. Программа-потребитель использует auto.offset.reset config только в том случае, если используемая группа потребителей не имеет допустимого смещения, зафиксированного во внутренней теме Kafka. (Другое поддерживаемое хранилище смещений - Zookeeper, но внутренняя тема Kafka используется как хранилище смещений в последние версии Kafka).

Рассмотрим следующие сценарии:

  1. Потребитель в группе потребителей с именем 'group1' израсходовал 5 сообщений из темы 'testtopic', и сведения о смещениях зафиксированы во внутренней теме Kafka. В следующий раз, когда потребитель запускается, он не будет использовать ' auto. offset.reset 'config. Вместо этого он извлечет сохраненное смещение из хранилища и продолжит извлекать сообщения из полученного смещения.

  2. Потребитель в группе потребителей с именем 'group2' запускается как новый потребитель для получения сообщений из 'testtopic'. Это новая группа, и во внутренней теме Kafka нет доступных сведений о смещении - сейчас используется конфигурация ' auto.offset.reset ', чтобы решить, с чего начать; либо с начала темы, либо с самого последнего (будут использоваться только новые сообщения).

Проблема в соответствии с вашим вопросом заключается в том, что команда сброса смещения не работает, вы должны вручную искать начала и запуска потребителя.

kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> [--topic <topic_name> or --all-topics] --reset-offsets [--to-earliest or --to-offset <offset>] --execute

Существует три возможности сброса команды сброса.

  1. Срок хранения журнала меньше, а смещение, которое вы пытаетесь сбросить, больше не доступно
  2. Экземпляр потребителя в группе потребителей запущен. В обоих случаях команда сброса смещения может не работать.
  3. Кафка версия <0.11. API сброса смещения доступен только для Kafka 0.11 </li>

Судя по вашему вопросу, первый и третий случай маловероятны. Пожалуйста, проверьте для второго случая. Остановите любой работающий экземпляр и попробуйте сбросить смещения.

Команду ниже можно использовать для проверки наличия активного экземпляра группы потребителей.

kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> --describe

Пример вывода:

Consumer group 'group1' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
intro           0          0               99              99 
...