Kafka Log Compaction всегда показывает две последние записи одного и того же ключа - PullRequest
1 голос
/ 25 апреля 2020

Нашел эти два вопроса: здесь и здесь , но я до сих пор не совсем понимаю. Я все еще имею (неожиданное?) Поведение.

Я пытаюсь сжать журнал kafka topi c, используя эту конфигурацию

kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic test1 --config "cleanup.policy=compact" --config "delete.retention.ms=1000" --config "segment.ms=1000" --config "min.cleanable.dirty.ratio=0.01" --config "min.compaction.lag.ms=500"

Затем я отправляю эти сообщения, каждый из них имеет интервал не менее 1 секунды

A: 3
A: 4
A: 5
B: 10
B: 20
B: 30
B: 40
A: 6

То, что я ожидаю, это через несколько секунд (1000 в соответствии с настройкой?), когда я запускаю kafka-console-consumer.sh --bootstrap-server localhost:9092 --property print.key=true --topic test1 --from-beginning, я должен получить

A: 6
B: 40

Вместо Я получил:

A: 5
B: 40
A: 6

Если я опубликовал sh другое сообщение B:50 и запустил получателя, я получил:

B: 40
A: 6
B: 50

вместо ожидаемого

A: 6
B: 50
  1. Собственно, как настроить сжатие журнала?
  2. Из Документация Kafka : Сжатие журнала гарантирует, что Kafka всегда будет сохранять по крайней мере последнее известное значение для каждого сообщения ключ в журнале данных для одного раздела topi c
    Означает ли это, что я могу использовать сжатие журнала только для topi c с одним разделом?

1 Ответ

1 голос
/ 27 апреля 2020

По сути, вы уже ответили сами. Как указано в документации Kafka, «сжатие журнала гарантирует, что Kafka всегда сохранит как минимум последнее известное значение для каждого ключа сообщения в журнале данных для одного раздела topi c». Таким образом, не гарантируется, что у вас всегда будет только одно сообщение для одного ключа.

Если я правильно понимаю сжатие журнала, оно не предназначено для случаев использования, которые вы задали в очень верном вопросе. Скорее, это в конечном итоге означает, что в topi c.

должно быть только одно сообщение на ключ. Сжатие журналов - это механизм, обеспечивающий более детальное хранение каждой записи, а не грубая задержка на основе времени. Идея состоит в том, чтобы выборочно удалять записи, где у нас есть более свежее обновление с тем же первичным ключом. Таким образом, журнал гарантированно будет иметь по крайней мере последнее состояние для каждого ключа.

Уплотненная топи c - правильный выбор, если вы планируете сохранять только последнее состояние для каждого ключа с целью обработки как можно меньшего количества старых состояний (что было бы с не сжатые топи c, в зависимости от времени / размера хранения). Насколько я узнал, варианты использования для сжатия журналов - скорее для сохранения последнего адреса, номера мобильного телефона, значения в базе данных и т. Д. c .. Значения, которые не меняются каждый момент и где у вас обычно много ключей.

С технической точки зрения, я думаю, что в вашем случае произошло следующее.

Когда дело доходит до сжатия, журнал рассматривается как разделенный на две части

  • Очистить : сообщения, которые были сжаты ранее. Этот раздел содержит только одно значение для каждого ключа, которое является самым последним значением во время предыдущего сжатия.
  • Грязные : сообщения, которые были написаны после последнего сжатия.

После создания сообщений B: 40 (A: 5 уже было создано) часть журнала clean пуста, а часть dirty/active содержит A: 5 и B: 40. Сообщение A: 6 еще не является частью журнала. Создание нового сообщения A: 6 запустит сжатие грязной части (поскольку у вас очень низкое соотношение) журнала, но , исключая само новое сообщение . Как уже упоминалось, очищать больше нечего, поэтому новое сообщение будет просто добавлено в topi c и теперь находится в грязной части журнала. То же самое происходит, что вы наблюдали при создании B: 50.

. Кроме того, сжатие никогда не произойдет в вашем активном сегменте. Таким образом, даже если вы установите segment.ms равным 1000 ms, он не будет генерировать новый сегмент, поскольку новые данные не поступают после создания A: 6 или B: 50.

. Чтобы решить вашу проблему и соблюдать Ожидается, что вам нужно создать еще одно сообщение C: 1 после создания A: 6 или B: 50. Таким образом, очиститель может снова сравнить чистые и грязные части журнала и удалит A: 5 или B: 40.

А пока посмотрите, как ведут себя сегменты в вашем каталоге журналов Kafka.

С моей точки зрения, конфигурации для сжатия журналов полностью в порядке! Это просто неправильный вариант использования, чтобы наблюдать ожидаемое поведение. Но в случае производственного использования имейте в виду, что ваши текущие конфигурации пытаются запустить сжатие довольно часто. Это может стать довольно интенсивным вводом / выводом в зависимости от объема ваших данных. Есть причина, по которой коэффициент по умолчанию установлен на 0.50, а log.roll.hours обычно установлен на 24 часа. Кроме того, вы обычно хотите, чтобы у потребителей была возможность прочитать все данные до их сжатия.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...