Вчера я обнаружил в журнале, что кафка повторно принимал некоторые сообщения после того, как координатор группы Кафки инициировал ребалансирование группы. Эти сообщения были использованы два дня назад (подтверждено из журнала).
В журнале было зарегистрировано два других перебалансирования, но они больше не пересчитывали сообщения. Так почему же в первый раз перепроверка вызовет повторное использование сообщений? Какие были проблемы?
Я использую клиент golang kafka. вот код
config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest
и мы обрабатываем сообщения перед тем, как запрашивать сообщения, поэтому, похоже, мы используем стратегию «Отправить по крайней мере один раз» для kafka. У нас есть три брокера на одной машине и только один потребительский поток (обычная процедура) на другой машине.
Какие-нибудь объяснения этому феномену?
Я думаю, что сообщения, должно быть, были переданы, потому что они были использованы два дня назад, или почему kafka будет хранить смещения более двух дней без фиксации?
Пример кода потребления:
func (consumer *Consumer) ConsumeClaim(session
sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
realHanlder(message) // consumed data here
session.MarkMessage(message, "") // mark offset
}
return nil
}
Добавлено:
Произошла перебалансировка после перезапуска приложения. Были два других перезапуска, которые не вызывали повторного подсчета
конфиги кафки
log.retention.check.interval.ms = 300000
log.retention.hours = 168
zookeeper.connection.timeout.ms = 6000
group.initial.rebalance.delay.ms = 0
delete.topic.enable = true
auto.create.topics.enable = false