Описание
Я пытался проверить правильность фрагмента данных, которые я посылаю Кафке.Когда я пытался использовать многопроцессорность в матрице, я испортил процесс, а также потребителя сообщений.Потребитель сообщений сначала не был правильно отключен, затем он перестал использовать сообщение.
После этого я перезапустил Kafka на своей локальной машине (я использую докер, поэтому я использовал
docker-compose -f docker-compose-single-broker.yml rm
, чтобы удалить кафку, которую я использовал для тестирования, и заново создал новую, используя
docker-compose -f dokcer-compose-single-broker.yml up
После кафки иkafka-manager был запущен и запущен, но я обнаружил, что хотя я не передаю никаких сообщений в kafka, значение смещения темы, которую я использовал для проверки, не было сброшено до 0.
Для данных на рисунке
"шлюз" - это потребитель, которым я пользовался до и после перезапуска kafka.
"gateway_tester" был темой, которую я использовал для отправки тестовых сообщений.
«Конец 54» (значение красного цвета) - это количество данных, использованных в этом разделе после перезапуска kafka.
«Смещение 899» (значение синим цветом) - количество данных, использованных в этом разделе.прежде чем я перезапустил кафку.
я в замешательстве почему не выключаетсяset number get reset после того, как я перезапустил kafka.
Когда я использовал этого потребителя после перезапуска kafka, он будет использовать все данные, которые я отправил kafka, потому что количество данных меньшечем 899 ...
Затем я создал нового потребителя с именем "gateway_2" для использования данных из той же темы.![image](https://user-images.githubusercontent.com/18712958/45995492-d5eff600-c0ca-11e8-913d-f69b9d42dc14.png)
Как показано на рисунке, на этот раз счетчик смещений соответствовал конечному значению.И все работает отлично.Если я отправляю данные в эту тему и пытаюсь использовать данные с помощью этого нового потребителя «gateway_2», он потребляет новые сообщения, которые я отправил в тему, и игнорирует сообщение, которое он использовал ранее.(Моя настройка смещения 'auto.offset.reset': 'smallest'
)
Мне интересно, есть ли способ сбросить счетчик смещения на потребителе, который я использовал раньше?Или единственный способ решить эту проблему - создать нового потребителя.
Воспроизвести
1) Запустить kafka, создать потребителя и использовать некоторые данные, чтобы изменить счетчик смещений для этого потребителя.
2) Завершите работу kafka.
3) Перезапустите kafka и используйте сообщение от одного и того же потребителя.
4) Потребитель будет использовать все данные в теме доКоличество данных в определенной теме достигает числа смещений.
Конфиги
Версия confluent-kafka-python и librdkafka: confluent_kafka.version(0.11.4)
kafka-python(1.3.5)
(я могне найден confluent_kafka.libversion()
, потому что проект, над которым я работаю, использовал pip для управления пакетами Python, а confluent_kafka.libversion не отображается в файле needs.txt ...)
ApacheВерсия брокера Kafka: 0.9.0.1
Конфигурация клиента:
KAFKA_HOST = '0.0.0.0'
KAFKA_PORT = 9092
KAFKA_HOST_PORT = '%(host)s:%(port)s' % {
'host': KAFKA_HOST,
'port': KAFKA_PORT,
}
kafka_configuration = {
'bootstrap.servers': KAFKA_HOST_PORT,
'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'},
}
(я обновил group.id
со значением gateway
и gateway_2
(для нового коншумер) в моем классе инициализатор)
- Операционная система: macOS 10.13.6