Kafka Consumer получает несколько (не все) старых сообщений (которые уже были обработаны ранее) - PullRequest
0 голосов
/ 24 июня 2018

У нас есть темы со сроком хранения 7 дней (168 часов).Сообщения потребляются в режиме реального времени, как и когда производитель отправляет сообщение.Все работает как положено.Однако недавно на производственном сервере Devops случайно изменила часовой пояс с PST на EST как часть исправления ОС.

После перезапуска сервера Kafka мы увидели, что потребители потребляют мало (не все, но случайные) старые сообщения.Мы попросили Devops изменить его обратно на PST и перезапустить.Снова старые сообщения снова появились в эти выходные.

Мы не видели этой проблемы в более низких средах (Dev, QA, Stage и т. Д.).

Kafka version: kafka_2.12-0.11.0.2

Любая помощь высоко ценится.

Добавление дополнительной информации ... Недавно наш CentOS обновил патч и каким-то образом администраторы изменили часовой пояс PST на EST и запустили серверы Kafka ...После этого наши потребители начали видеть сообщения со смещения 0. После отладки я обнаружил изменение часового пояса, и администраторы вернулись с EST на PST через 4 дня.Наши производители сообщений регулярно отправляли сообщения до и после смены часового пояса.После смены часового пояса с EST на PST серверы Kafka были перезапущены, и я вижу следующее предупреждение.

Этот журнал произошел, когда мы переключились обратно с EST на PST: (server.log) [2018-06-1318: 36: 34,430] ПРЕДУПРЕЖДЕНИЕ Обнаружен поврежденный файл индекса из-за сбоя требования: найден поврежденный индекс, файл индекса (/app/kafka_2.12-0.11.0.2/data/__consumer_offsets-21/00000000000000002076.index) имеет ненулевой размерно последнее смещение 2076, которое не больше базового смещения 2076.}.удаление /app/kafka_2.12-0.11.0.2/data/__consumer_offsets-21/00000000000000002076.timeindex, /app/kafka_2.12-0.11.0.2/data/__consumer_offsets-21/00000000000000002076.index и /app/kafka_2.12-0.11.0.2 / data / __ consumer_offsets-21 / 00000000000000002076.txnindex и перестроение индекса ... (kafka.log.Log)

Мы перезапустили потребителей после 3 дней изменения часового пояса обратно с EST на PST и начали видетьсообщения потребителей со смещением 0 снова.

1 Ответ

0 голосов
/ 26 июня 2018

Я думаю, это потому, что вы перезапустите программу, прежде чем Commit новые смещения.

Управление смещениями

Для каждой группы потребителей Kafka поддерживает фиксированное смещение для каждого потребляемого раздела. Когда потребитель обрабатывает сообщение, он не удаляет его из раздела. Вместо этого он просто обновляет свое текущее смещение, используя процесс, называемый его фиксацией.

Если после обработки сообщения произойдет сбой потребителя, но перед его смещением, информация о смещении не будет отражать обработку сообщения. Это означает, что сообщение будет снова обработано следующим потребителем в этой группе, которому будет назначен раздел.

Автоматическая фиксация смещений

Самый простой способ зафиксировать смещения - позволить потребителю Kafka сделать это автоматически. Это просто, но дает меньше контроля, чем фиксация вручную. По умолчанию потребитель автоматически фиксирует смещения каждые 5 секунд. Эта фиксация по умолчанию происходит каждые 5 секунд, независимо от того, как продвигается пользователь к обработке сообщений. Кроме того, когда потребитель вызывает poll(), это также приводит к тому, что последнее смещение, возвращенное от предыдущего вызова к poll(), будет зафиксировано (поскольку оно, вероятно, было обработано).

Если зафиксированное смещение опережает обработку сообщений и возникает сбой потребителя, возможно, некоторые сообщения не будут обработаны. Это связано с тем, что обработка возобновляется с зафиксированным смещением, которое позже, чем последнее сообщение, которое должно быть обработано до сбоя. По этой причине, если надежность важнее, чем простота, обычно лучше фиксировать смещения вручную.

Фиксация смещений вручную

Если для ‍ enable.auto.commit установлено значение false, потребитель фиксирует свои смещения вручную. Он может делать это либо синхронно, либо асинхронно. Обычным шаблоном является фиксация смещения последнего обработанного сообщения на основе периодического таймера. Этот шаблон означает, что каждое сообщение обрабатывается по крайней мере один раз, но зафиксированное смещение никогда не опережает ход сообщений, которые активно обрабатываются. Частота периодического таймера контролирует количество сообщений, которые могут быть обработаны после сбоя потребителя. Сообщения извлекаются снова из последнего сохраненного зафиксированного смещения при перезапуске приложения или при перебалансировании группы.

Фиксированное смещение - это смещение сообщений, с которых обработка возобновляется. Обычно это смещение последнего обработанного сообщения плюс один.

Из этой статьи , что, на мой взгляд, очень полезно.

...