Ниже приведена команда, которая вам понадобится для получения смещения всех разделов для данной темы kafka для для данной группы потребителей :
kafka-consumer-groups --bootstrap-server <kafka-broker-list-with-ports> --describe --group <consumer-group-name>
Пожалуйста,обратите внимание, что <consumer-group-name>
в конце важен, поскольку смещения фиксируются потребителями, которые обычно являются частью группы потребителей.
Вывод этой команды может выглядеть примерно так:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
<topic-name> 0 62 62 0 <consumer-id> <host> <client>
Однако в своем посте вы пытаетесь получить эту информацию для внутренней темы __consumer_offsets
, поэтому вам понадобится группа потребителей, в которой потребители будут использовать эту внутреннюю тему.Возможно, вы могли бы сделать следующее:
kafka-console-consumer --bootstrap-server <kafka-broker-list-with-ports> --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --max-messages 5
Вывод вышеуказанной команды:
[<consumer-group-name>,<topic-name>,0]::[OffsetMetadata[481690879,NO_METADATA],CommitTime 1479708539051,ExpirationTime 1480313339051]
Просто используйте
<consumer-group-name>
из выходных данных и поместите его в команду
kafka-consumer-groups
, упомянутую в начале, и вы получите сведения о смещении для всех 50 разделов для
для данной группы потребителей только.
Надеюсь, это поможет.