Kafka Log Compacted Topi c Значения дублирования для того же ключа не удалены - PullRequest
3 голосов
/ 10 апреля 2020

Журнал сжатых тем не должен хранить дубликаты против того же ключа . Но в нашем случае, когда отправляется новое значение с тем же ключом, предыдущее не удаляется. В чем может быть проблема?

val TestCompactState: KTable[String, TestCompact] = builder.table[String, TestCompact](kafkaStreamConfigs.getString("testcompact-source"),
   (TestCompactmaterialized).withKeySerde(stringSerde).withValueSerde(TestCompactSerde)) 

что я получу Фактический результат

Offsets      Keys        Messages
5            {"id":5}   {"id":5,"namee":"omer","__deleted":"false"}
6            {"id":5}   {"id":5,"namee":"d","__deleted":"false"}

Я просто хочу последнюю одну запись с тем же ключом Ожидаемый результат

6            {"id":5}   {"id":5,"namee":"d","__deleted":"false"}

Ответы [ 2 ]

4 голосов
/ 10 апреля 2020

Насколько я знаю, невозможно применить политику сжатия журналов, чтобы сохранять точно одно сообщение на ключ. Даже если вы установите cleanup.policy=compact (topi c -level) или log.cleanup.policy=compact (глобальный уровень), нет гарантии, что будут сохранены только самые последние сообщения, а старые будут сжаты.

В соответствии с официальной документацией Kafka :

Сжатие журнала дает нам более детальный механизм хранения, так что мы гарантированно сохраняем , по крайней мере, последнее обновление для каждого первичного ключа

2 голосов
/ 10 апреля 2020

Причин такого поведения может быть несколько. Политика очистки уплотнения не запускается после каждого входящего сообщения. Вместо этого существует конфигурация брокера

log.cleaner.min.compaction.lag.ms : минимальное время, в течение которого сообщение остается некомпактным в журнал. Применимо только для сжатых журналов.

Тип: long; По умолчанию: 0; Допустимые значения:; Режим обновления: для всего кластера

По умолчанию используется значение 0, поэтому это может быть не причиной, а проверкой.

Важно отметить, что политика compact никогда сжимает текущий сегмент. Сообщения имеют право на сжатие только на неактивных сегментах. Обязательно проверьте

log.segment.bytes : максимальный размер отдельного файла журнала

Тип: int; По умолчанию: 1073741824; Допустимые значения: [14, ...]; Режим обновления: для всего кластера

Сжатие обычно запускается данными, находящимися в текущем («грязном») сегменте журнала. Термин «грязный» происходит от неочищенного / неуплотненного. Существует еще одна конфигурация, которая помогает управлять уплотнением.

log.cleaner.min.cleanable.ratio : минимальное отношение грязного журнала к общему журналу для журнала, на которое можно претендовать чистка. Если также указаны конфигурации log.cleaner.max.compaction.lag.ms или log.cleaner.min.compaction.lag.ms, то компактор журналов считает журнал пригодным для сжатия, как только: (i) порог грязного соотношения был достигнут, и журнал имел грязные (неуплотненные) записи, по крайней мере, на время log.cleaner.min.compaction.lag.ms или (ii) если журнал имел грязные (неуплотненные) записи не более период log.cleaner.max.compaction.lag.ms.

Тип: double; По умолчанию: 0,5; Допустимые значения:; Режим обновления: для всего кластера

По умолчанию задержка удаления для сообщения, подлежащего сжатию, достаточно высока, как показано в следующем описании конфигурации.

log.cleaner.max.compaction.lag.ms : максимальное время, в течение которого сообщение не может быть сжато в журнале. Применимо только для сжатых журналов.

Тип: long; По умолчанию: 9223372036854775807; Допустимые значения:; Режим обновления: для всего кластера

Подводя итог, можно привести несколько причин, по которым вы наблюдаете то, что описали. Есть хороший блог , который объясняет сжатие журнала более подробно.

...