Возможные причины повторного использования сообщений Kafka - PullRequest
0 голосов
/ 02 июля 2019

Вчера я обнаружил в журнале, что кафка повторно принимал некоторые сообщения после того, как координатор группы Кафки инициировал ребалансирование группы. Эти сообщения были использованы два дня назад (подтверждено из журнала).

В журнале было зарегистрировано два других перебалансирования, но они больше не пересчитывали сообщения. Так почему же в первый раз перепроверка вызовет повторное использование сообщений? Какие были проблемы?

Я использую клиент golang kafka. вот код

config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest 

и мы обрабатываем сообщения перед тем, как запрашивать сообщения, поэтому, похоже, мы используем стратегию «Отправить по крайней мере один раз» для kafka. У нас есть три брокера на одной машине и только один потребительский поток (обычная процедура) на другой машине.

Какие-нибудь объяснения этому феномену? Я думаю, что сообщения, должно быть, были переданы, потому что они были использованы два дня назад, или почему kafka будет хранить смещения более двух дней без фиксации?

Пример кода потребления:

func (consumer *Consumer) ConsumeClaim(session 
sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

for message := range claim.Messages() {
    realHanlder(message)   // consumed data here
    session.MarkMessage(message, "") // mark offset
}

return nil

}

Добавлено:

  1. Произошла перебалансировка после перезапуска приложения. Были два других перезапуска, которые не вызывали повторного подсчета

  2. конфиги кафки

    log.retention.check.interval.ms = 300000 log.retention.hours = 168
    zookeeper.connection.timeout.ms = 6000
    group.initial.rebalance.delay.ms = 0
    delete.topic.enable = true
    auto.create.topics.enable = false

...