Уплотнение кафки для дедупликации - PullRequest
0 голосов
/ 22 января 2019

Я пытаюсь понять, как работает сжатие Kafka, и у меня возникает следующий вопрос: гарантирует ли kafka уникальность ключей для сообщений, хранящихся в теме с включенным сжатием?

Спасибо!

Ответы [ 2 ]

0 голосов
/ 22 января 2019

Короткий ответ - нет.

Kafka не гарантирует уникальность ключа, сохраненного с включенным сохранением темы.

В Кафке у вас есть два типа cleanup.policy:

  • delete - Это означает, что по истечении заданного времени сообщения не будут доступны. Для этого можно использовать несколько свойств: log.retention.hours, log.retention.minutes, log.retention.ms. По умолчанию log.retention.hours установлено 168. Это означает, что сообщения старше более 7 дней будут удалены
  • compact - Для каждого ключа будет доступно как минимум одно сообщение. В некоторых ситуациях это может быть один, но в большинстве случаев это будет больше. Обработанное уплотнение периодически запускается в фоновом режиме. Он копирует части журнала, удаляя дубликаты и оставляя только последнее значение.

Если вы хотите прочитать только одно значение для каждого ключа, вы должны использовать KTable<K,V> абстракцию от Kafka Streams .

Смежный вопрос относительно последнего значения ключа и уплотнения: Кафка подписаться только на последнее сообщение?

0 голосов
/ 22 января 2019

Глядя на 4 гарантии сжатия kakfa , число 4 состояния:

Любой потребитель, прогрессирующий с начала журнала, увидит хотя бы конечное состояние всех записей в порядке их написания.Кроме того, будут видны все маркеры удаления для удаленных записей при условии, что потребитель достигнет заголовка журнала за период времени, меньший, чем параметр delete.retention.ms в разделе (по умолчанию 24 часа).Другими словами: поскольку удаление маркеров удаления происходит одновременно со считыванием, потребитель может пропустить маркеры удаления, если он отстает более чем на delete.retention.ms.

Итак, вы будетеиметь более одного значения для ключа, если заголовок темы не сохраняется политикой delete.retention.ms.

Насколько я понимаю, если вы установите политику хранения на 24 часа (delete.retention.ms=86400000), у вас будет уникальное значение для одного ключа для всех сообщений, поступивших с 24 часов назад.Это ваш по крайней мере , но не только, поскольку многие другие сообщения для того же ключа, возможно, поступили в течение последних 24 часов.

Таким образом, гарантировано, что вы поймаете по крайней мереодин, но не только последний, потому что сохранение не влияло на последние сообщения.

edit.Как говорится в комментарии крикета, даже если вы установили свойство удаления на 1 день, log.roll.ms - это то, что определяет, когда сегмент журнала закрыт, на основе отметки времени сообщения.Поскольку этот последний сегмент никогда не сохраняется для сжатия, он становится вторым фактором, который не позволяет иметь только последнее значение для вашего известного ключа.Если ваша тема начинается с T0, то сообщения после T0+log.roll.ms будут находиться в открытом сегменте журнала, поэтому не будут уплотнены.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...