Обработка потребительских смещений Kafka немного сложнее. Программа-потребитель использует auto.offset.reset config только в том случае, если используемая группа потребителей не имеет допустимого смещения, зафиксированного во внутренней теме Kafka. (Другое поддерживаемое хранилище смещений - Zookeeper, но внутренняя тема Kafka используется как хранилище смещений в последние версии Kafka).
Рассмотрим следующие сценарии:
Потребитель в группе потребителей с именем 'group1' израсходовал 5 сообщений из темы 'testtopic', и сведения о смещениях зафиксированы во внутренней теме Kafka. В следующий раз, когда потребитель запускается, он не будет использовать ' auto. offset.reset 'config. Вместо этого он извлечет сохраненное смещение из хранилища и продолжит извлекать сообщения из полученного смещения.
Потребитель в группе потребителей с именем 'group2' запускается как новый потребитель для получения сообщений из 'testtopic'. Это новая группа, и во внутренней теме Kafka нет доступных сведений о смещении - сейчас используется конфигурация ' auto.offset.reset ', чтобы решить, с чего начать; либо с начала темы, либо с самого последнего (будут использоваться только новые сообщения).
Проблема в соответствии с вашим вопросом заключается в том, что команда сброса смещения не работает, вы должны вручную искать начала и запуска потребителя.
kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> [--topic <topic_name> or --all-topics] --reset-offsets [--to-earliest or --to-offset <offset>] --execute
Существует три возможности сброса команды сброса.
- Срок хранения журнала меньше, а смещение, которое вы пытаетесь сбросить, больше не доступно
- Экземпляр потребителя в группе потребителей запущен. В обоих случаях команда сброса смещения может не работать.
- Кафка версия <0.11. API сброса смещения доступен только для Kafka 0.11 </li>
Судя по вашему вопросу, первый и третий случай маловероятны. Пожалуйста, проверьте для второго случая. Остановите любой работающий экземпляр и попробуйте сбросить смещения.
Команду ниже можно использовать для проверки наличия активного экземпляра группы потребителей.
kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> --describe
Пример вывода:
Consumer group 'group1' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
intro 0 0 99 99