Удалите указанную c запись в топике Кафки c, используя сжатие - PullRequest
0 голосов
/ 23 апреля 2020

Я пытаюсь удалить указанное c сообщение или запись из топики Kafka c. Я понимаю, что Кафка не был создан для этого. Но возможно ли использовать сжатие topi c с возможностью замены записи пустой записью с использованием специального ключа c Kafka? Как это можно сделать?

Спасибо

1 Ответ

0 голосов
/ 23 апреля 2020

Да, вы можете избавиться от конкретного сообщения, если у вас есть сжатый топи c.

В этом случае ключ вашего сообщения становится идентификатором. Если вы хотите удалить конкретное сообщение, вам нужно отправить сообщение с тем же ключом и пустым значением в topi c. Это называется tombstone сообщением. Kafka будет хранить эту надгробную плиту в течение настраиваемого промежутка времени (чтобы ваши потребители могли иметь дело с удалением). По истечении заданного промежутка времени поток очистки удалит сообщение-захоронение, и ключ будет удален из раздела в Kafka.

В общем, обратите внимание, что старое (подлежащее удалению) сообщение будет не исчезают сразу. В зависимости от конфигураций, может потребоваться некоторое время, прежде чем произойдет замена отдельного сообщения.

Я нашел это резюме по конфигурациям весьма полезным ( ссылка на блог )

1) Для активации политики очистки уплотнения необходимо поместить cleanup.policy=compact

2) Потребитель видит все надгробия до тех пор, пока потребитель достигает заголовка журнала в период, меньший, чем topi c config delete.retention.ms (по умолчанию 24 часа).

3) Количество этих потоков настраивается с помощью log.cleaner.threads config

4) Затем поток очистки выбирает журнал с помощью самое высокое соотношение грязи. dirty ratio = the number of bytes in the head / total number of bytes in the log(tail + head)

5) Topi c config min.compaction.lag.ms используется для гарантии минимального периода, который должен пройти, прежде чем сообщение может быть сжато.

6) Чтобы установить задержку для начала сжатия записи после их записи используют topi c config log.cleaner.min.compaction.lag.ms. Записи не будут уплотнены до истечения этого периода. Этот параметр дает потребителям время для получения каждой записи.

сжатие журнала представлено как

сжатие журнала гарантирует, что Kafka всегда будет сохранять по крайней мере последнее известное значение для каждого ключа сообщения в журнале данных для одного раздела topi c.

Его гарантии перечислены здесь :

Сжатие журнала обрабатывается средством очистки журнала, пулом фоновых потоков, которые переписывают файлы сегментов журнала, удаляя записи, ключ которых появляется в заголовке журнала. Каждый поток уплотнения работает следующим образом:

1) Он выбирает журнал, который имеет самое высокое отношение головки журнала к хвосту журнала

2) Создает краткую сводку последнего смещения для каждого ключа в заголовке журнала

3) Переписывает журнал от начала до конца, удаляя ключи, которые позже встречаются в журнале. Новые чистые сегменты немедленно добавляются в журнал, поэтому необходимое дополнительное дисковое пространство составляет всего один дополнительный сегмент журнала (а не полная копия журнала).

4) Сводка заголовка журнала, по сути, просто space-compact ha sh стол. Он использует ровно 24 байта на запись. В результате при использовании 8 ГБ чистого буфера одна итерация может очистить около 366 ГБ заголовка журнала (при условии, что в сообщениях по 1 КБ).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...