Когда кафка удаляет сообщения - PullRequest
2 голосов
/ 24 апреля 2019

У меня есть кластер Apache Kafka с удалением политики хранения и периодом хранения 24 часа. Затем я динамически изменил срок хранения и установил его на 1 минуту для какой-то конкретной темы. Но старые сообщения все еще там, поэтому у меня есть несколько вопросов:

  1. Что такое триггерная точка для удержания? Я предполагаю, что хотя некоторое явное время жизни установлено для сообщений, не гарантируется, что сообщения будут удалены именно после этого времени. Так каков процесс? (Ничего не могу найти в ссылке)
  2. Если я изменю срок хранения во время выполнения, будут ли подчиняться старые сообщения. Насколько я понимаю, срок хранения является общим свойством и должен работать также для сообщений, которые были опубликованы с первым сроком хранения.

Ответы [ 2 ]

2 голосов
/ 24 апреля 2019

На каждом брокере разделы делятся на сегментные журналы.По умолчанию сегмент будет хранить 1 ГБ данных (log.segment.bytes) данных.Кроме того, новый сегмент журнала развертывается по умолчанию каждые 7 дней (log.roll.hours)

Каждый брокер планирует поток очистки, который отвечает за периодическую проверку того, какие сегменты имеют право на удаление.По умолчанию чистящий поток будет запускать проверку каждые 5 минут (это можно настроить с помощью конфигурации брокера: log.retention.check.interval.ms)

Сегмент можно удалить, если последнее сообщение вжурнал старше, чем настроенный срок хранения.Кроме того, журнал активного сегмента (тот, в который в данный момент пишет брокер) не может быть удален

Чтобы иметь возможность удалить журнал сегмента как можно скорее, вы должны сконфигурировать журнал в корреляциис вами срок хранения.Например, если период хранения настроен на 24 часа, это может быть хорошим идентификатором для настроенного log.roll.hours на 1 час.

Обратите внимание, что удаление сегмента на самом деле может происходить в разное время у каждого брокера, так какПотоки очистителя запланированы вместе.

Проверка конфигурации конкретной темы с помощью kafka-configs script:

Пример: ./bin/kafka-configs --describe --zookeeper localhost:2181 --entity-type topics --entity-name __consumer_offsets

2 голосов
/ 24 апреля 2019

Политика хранения применяется только к закрытым сегментам. Если ваш сегмент все еще активен, данные в этом сегменте не будут очищаться до тех пор, пока не будет закрыт и новый сегмент не будет открыт.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...