Сброс kafka LAG (изменение смещения) внутри группы потребителей в Kafka-python - PullRequest
0 голосов
/ 25 апреля 2018

Я нашел это, где я сбросил свою LAG с помощью инструмента kafka-consumer-groups.sh Как изменить начальное смещение для темы? , но мне нужно сбросить его в приложении.Я нашел этот пример, но он не сбрасывает его. чтение kafka-python из последнего созданного сообщения после перезапуска потребителя пример

    consumer = KafkaConsumer("MyTopic", bootstrap_servers=self.kafka_server + ":" + str(self.kafka_port),
                             enable_auto_commit=False,
                             group_id="MyTopic.group")
    consumer.poll()
    consumer.seek_to_end()
    consumer.commit()

    ... continue on with other code...

Запуск bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --group MyTopic.group --describe по-прежнему показывает, что оба раздела имеют группу LAG.Как я могу получить текущее смещение в «fast-foward» до конца?

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST             CLIENT-ID
MyTopic         0          52110           66195           14085           kafka-python-1.4.2-6afb6901-c651-4534-a482-15358db42c22 /Host1  kafka-python-1.4.2
MyTopic         1          52297           66565           14268           kafka-python-1.4.2-c70e0a71-7d61-46a1-97bc-aa2726a8109b /Host2  kafka-python-1.4.2

1 Ответ

0 голосов
/ 17 мая 2018

Чтобы «перемотать» вперед смещение группы потребителей, то есть очистить LAG, вам нужно создать нового потребителя, который присоединится к той же группе.
консольная команда для этого:

kafka-console-consumer.sh --bootstrap-server <brokerIP>:9092 --topic <topicName> --consumer-property group.id=<groupName>

Параллельно вы можете запустить команду, чтобы увидеть лаги, как вы описали, и вы увидите, что лаги стерты.

...