confluent_kafka проблема сброса счетчика смещения потребителя - PullRequest
0 голосов
/ 25 сентября 2018

Описание

Я пытался проверить правильность фрагмента данных, которые я посылаю Кафке.Когда я пытался использовать многопроцессорность в матрице, я испортил процесс, а также потребителя сообщений.Потребитель сообщений сначала не был правильно отключен, затем он перестал использовать сообщение.

После этого я перезапустил Kafka на своей локальной машине (я использую докер, поэтому я использовал

docker-compose -f docker-compose-single-broker.yml rm

, чтобы удалить кафку, которую я использовал для тестирования, и заново создал новую, используя

docker-compose -f dokcer-compose-single-broker.yml up

После кафки иkafka-manager был запущен и запущен, но я обнаружил, что хотя я не передаю никаких сообщений в kafka, значение смещения темы, которую я использовал для проверки, не было сброшено до 0. image Для данных на рисунке

"шлюз" - это потребитель, которым я пользовался до и после перезапуска kafka.

"gateway_tester" был темой, которую я использовал для отправки тестовых сообщений.

«Конец 54» (значение красного цвета) - это количество данных, использованных в этом разделе после перезапуска kafka.

«Смещение 899» (значение синим цветом) - количество данных, использованных в этом разделе.прежде чем я перезапустил кафку.

я в замешательстве почему не выключаетсяset number get reset после того, как я перезапустил kafka.

Когда я использовал этого потребителя после перезапуска kafka, он будет использовать все данные, которые я отправил kafka, потому что количество данных меньшечем 899 ...

Затем я создал нового потребителя с именем "gateway_2" для использования данных из той же темы.image

Как показано на рисунке, на этот раз счетчик смещений соответствовал конечному значению.И все работает отлично.Если я отправляю данные в эту тему и пытаюсь использовать данные с помощью этого нового потребителя «gateway_2», он потребляет новые сообщения, которые я отправил в тему, и игнорирует сообщение, которое он использовал ранее.(Моя настройка смещения 'auto.offset.reset': 'smallest')

Мне интересно, есть ли способ сбросить счетчик смещения на потребителе, который я использовал раньше?Или единственный способ решить эту проблему - создать нового потребителя.

Воспроизвести

1) Запустить kafka, создать потребителя и использовать некоторые данные, чтобы изменить счетчик смещений для этого потребителя.

2) Завершите работу kafka.

3) Перезапустите kafka и используйте сообщение от одного и того же потребителя.

4) Потребитель будет использовать все данные в теме доКоличество данных в определенной теме достигает числа смещений.

Конфиги

  • Версия confluent-kafka-python и librdkafka: confluent_kafka.version(0.11.4) kafka-python(1.3.5) (я могне найден confluent_kafka.libversion(), потому что проект, над которым я работаю, использовал pip для управления пакетами Python, а confluent_kafka.libversion не отображается в файле needs.txt ...)

  • ApacheВерсия брокера Kafka: 0.9.0.1

  • Конфигурация клиента:

    KAFKA_HOST = '0.0.0.0'

    KAFKA_PORT = 9092

    KAFKA_HOST_PORT = '%(host)s:%(port)s' % { 'host': KAFKA_HOST, 'port': KAFKA_PORT, }

    kafka_configuration = { 'bootstrap.servers': KAFKA_HOST_PORT, 'session.timeout.ms': 6000, 'default.topic.config': {'auto.offset.reset': 'smallest'}, }

(я обновил group.id со значением gateway и gateway_2 (для нового коншумер) в моем классе инициализатор)

  • Операционная система: macOS 10.13.6

Ответы [ 2 ]

0 голосов
/ 26 сентября 2018

Я также разместил этот вопрос как проблему на странице github confluent-kafka-python.Мой вопрос был решен участником.

Вот ссылка на проблему: https://github.com/confluentinc/confluent-kafka-python/issues/455

В целом, автор @rnpridgeon говорит, что «Перезапуска одного брокера недостаточно для удаления смещений»,Вам необходимо удалить резервный том, а также в нем хранится содержимое раздела __consumer_offsets, в котором хранятся смещения ваших групп потребителей. '

После этого я проверяю документы докера (https://docs.docker.com/compose/reference/rm/) и выясняю,моей команды docker-compose -f docker-compose-single-broker.yml rm недостаточно для удаления анонимных томов, прикрепленных к контейнеру.

Вместо этого я должен был использовать команду docker-compose -f docker-compose-single-broker.yml rm -v

Тогда моя проблема была решена, смещениезначение сбрасывается после повторного запуска кафки с помощью вышеуказанной команды.

Offset on that consumer get reset!

0 голосов
/ 25 сентября 2018

'auto.offset.reset': 'smallest' означает, что если информация о смещении отсутствует, смещение будет установлено на минимальное доступное значение.

Как только вы получите сообщение от kafka, информация о смещении уже есть, и смещение не будет наименьшим.Когда вы перезапустите потребителя kafka, он получит сообщение, с которого вы остановились в прошлый раз.

Возможно, вы можете попытаться установить для enable.auto.commit значение false, что отключит автоматическую фиксацию смещения, если она не работает, вам может понадобиться искать смещение до наименьшего значения каждый раз, когда вы перезапускаете потребителя, если вы предпочитаетепотреблять из самого раннего сообщения.

...