Кафка: сообщения, исчезающие из тем, наибольшее время = 0 - PullRequest
0 голосов
/ 28 апреля 2020

У нас есть сообщения, исчезающие из тем на Apache Kafka с версиями 2.3, 2.4.0, 2.4.1 и 2.5.0. Мы заметили это, когда мы развертываем наши кластеры, и, к сожалению, это происходит не каждый раз, поэтому это очень противоречиво.

Иногда мы теряем все сообщения внутри топи c, в других случаях мы теряем все сообщения внутри раздела. Когда это происходит, следующий журнал является константой:

[2020-04-27 10:36:40,386] INFO [Log partition=test-lost-messages-5, dir=/var/kafkadata/data01/data] Deleting segments List(LogSegment(baseOffset=6, size=728, lastModifiedTime=1587978859000, largestTime=0)) (kafka.log.Log)

Существует также предыдущий журнал, в котором говорится, что этот сегмент достиг нарушения времени хранения 48 часов. В этом примере сообщение было создано примерно за 12 минут до развертывания.

Обратите внимание, что все сообщения, которые были ошибочно удалены, имеют largestTime=0, а те, которые были должным образом удалены, имеют действительную метку времени. Из того, что мы читаем из документации и кода, похоже, что largestTime используется для вычисления того, достиг ли данный сегмент временного нарушения или нет.

Поскольку мы можем наблюдать это в нескольких версиях Kafka, мы думаем, что может быть связано с чем-то внешним по отношению к Кафке. Например, Zookeeper.

У кого-нибудь есть идеи, почему это может происходить? Мы используем Zookeeper 3.6.0.

Ответы [ 2 ]

1 голос
/ 08 мая 2020

Мы выяснили, что причина была не в самой Кафке, а в том, где мы хранили логи. Тем не менее, следующее объяснение может быть полезно для образовательных целей:

В деталях, это была проблема с разрешениями, когда Кафка не мог читать файлы .timeindex, когда запускалась программа очистки журналов. Это привело к тому, что largestTime стало 0 и привело к удалению некоторых сообщений задолго до срока хранения.

Каждый раздел topi c разделен на несколько сегментов, а последние затем сохраняются в различные .log файлы, которые содержат фактические сообщения. Для каждого файла .log существует файл .timeindex, содержащий карту между смещением и lastModifiedTime.

Когда Кафке нужно проверить, можно ли удалить сегмент, он ищет самое последнее смещение lastModifiedTime и сохраняет его как самое большое время. Затем проверяет, был ли достигнут срок хранения: currentTime - largestTime > retentionTime.

Если это так, он удаляет сегмент и соответствующие сообщения.

Поскольку Кафка не смог прочитать файл, largestTime был 0, а проверка currentTime > retentionTime всегда была верна для нашего 1-дневного хранения.

0 голосов
/ 28 апреля 2020

Убедитесь, что дата синхронизируется между всеми брокерами Kafka и узлами ZooKeeper.
Bash команда: date.
Сравнение года, дня, часа и минуты.

...