нам нужно создать компактные темы, которые должны быть уплотнены после определенного размера (сегмент.байт), но НАИБОЛЕЕ ВАЖНО по истечении определенного времени (сегмент.мс) (даже если сегмент.байты не достигли) на уровне конфигурации раздела.
Теперь мы видим, что файл index.bytes соблюдается, но файлgment.ms не соблюдается.Я воспроизвел эту проблему с дистрибутивом Confluent kafka 5.x
https://kafka.apache.org/documentation/#topicconfigs
. Это то, что я прочитал о файле plot.ms в документации apache kafka, и это заставляет меня поверить, что наше понимание верно- тот сегмент.ms переопределяет сегмент.байты - когда дело доходит до того, что кафка делает уплотнение по теме.
сегмент.ms Эта конфигурация управляет периодом времени, после которого Kafka принудительно заставит журнал катиться, даже если файл сегмента не заполнен, чтобы гарантировать, что сохранение может удалить или сжать старые данные.
Я отправляю данные с поворотом ключа между значениями 0-20 и строкой «Spring Kafka Producer and Consumer Example», и я добавляю значение ключа к этой строке.
это код производителя
@Override
public void run(String... strings) throws Exception {
String data = "Spring Kafka Producer and Consumer Example";
for (int j = 0; j < 30000; j++) {
for (int i = 0; i < 20; i++) {
sender.send(new Integer(i).toString(), data + i);
}
}
}
образец кода здесь https://github.com/leofoto/kafka-producer-consumer.git
Я взял образец кода (и изменил его для этого теста) https://memorynotfound.com/spring-kafka-consume-producer-example/
Сначала я создал компактную тему, в журналах брокера я вижу следующее
Создан журнал для раздела my-topic-compact-0 в / tmp / kafka-logsсо свойствами {сжатие.типа -> производитель, сообщение.format.version -> 2.0-IV1, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms ->0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, сегмент.jitter.ms -> 0, предварительное распределение -> false, min.cleanable.dirty.ratio-> 0,5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [удалить],flush.ms -> 9223372036854775807, сегмент.ms -> 604800000, сегмент.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, сегмент.index.bytes -> 10485760, flush.messages -> 9223372036854775807}.(kafka.log.LogManager) [2018-09-17 21: 28: 00,110] INFO [Partition my-topic-compact-0 broker = 0] Не найдена контрольная точка верхнего знака для раздела my-topic-compact-0 (kafka.cluster.Partition)
затем, когда я изменил конфигурацию, чтобы сделать тему компактной
. / kafka-configs --zookeeper localhost: 2181 - темы типа объекта--entity-name my-topic-compact --alter --add-config min.cleanable.dirty.ratio = 0.01, cleanup.policy = compact, сегмент.ms = 12000, delete.retention.ms = 100, сегмент.bytes = 200000 Завершено Обновление конфигурации для сущности: тема 'my-topic-compact'.
Журналы брокера показывают его снова (теперь правильно сообщает о сжатой теме)
[2018-09-17 22: 06: 25,745] ИНФОРМАЦИЯ Обработка уведомлений (уведомлений) для / config / changes (kafka.common.ZkNodeChangeNotificationListener) [2018-09-17 22: 06: 25,746] ИНФОРМАЦИЯ Переопределение обработки для entityPath: themes/ my-topic-compact с config: Карта (cleanup.policy -> compact ,gment.ms -> 12000, min.cleanable.dirty.ratio -> 0.01, сегмент.байт -> 200000, delete.retention.ms -> 100) (kafka.server.DynamicConfigManager)
Также команда kafka-config --describe показывает это ясно
. / Kafka-configs --zookeeper localhost: 2181 - темы типа Entity - Entity-Name my-topic-compact --describe
Конфиги для темы 'my-topic-compact '- это конфиги для темы' my-topic-compact ', такие: plot.bytes = 200000, min.cleanable.dirty.ratio = 0.01, delete.retention.ms = 100, сегмент.ms = 12000, cleanup.policy= compact
когда я запускаю сервер kafka, я вижу следующее сообщение
<< Начало очистки журнала с периодом 300000 мс >> [[Я уверен300 секунд - это значение конфигурации посредника, в данном случае значение уровня темы составляет 12 секунд]]
[2018-09-17 22: 01: 31,215] ИНФОРМАЦИЯ [раздел журнала = my-topic-non-compact-0, dir = / tmp / kafka-logs] Завершена загрузка журнала с 1 сегментом, смещением начала журнала 0 и смещением конца журнала 20 за 2 мс (kafka.log.Log) [2018-09-17 22: 01: 31,218] INFO Загрузка журналов завершена за 378 мс.(kafka.log.LogManager) [2018-09-17 22: 01: 31,224] ИНФОРМАЦИЯ Запуск очистки журнала с периодом 300000 мс.(kafka.log.LogManager) [2018-09-17 22: 01: 31,225] INFO Запуск очистителя журнала с периодом по умолчанию 9223372036854775807 мс.(kafka.log.LogManager) [2018-09-17 22: 01: 31,439] ИНФОРМАЦИЯ Ожидание соединений с сокетом в 0.0.0.0:9092.(kafka.network.Acceptor) [2018-09-17 22: 01: 31,463] ИНФОРМАЦИЯ [SocketServer brokerId = 0] Запущены 1 потоки акцептора (kafka.network.SocketServer) [2018-09-17 22: 01: 31,478] ИНФОРМАЦИЯ[ExpirationReaper-0-Produce]: запуск (kafka.server.DelayedOperationPurgatory $ ExpiredOperationReaper) [2018-09-17 22: 01: 31,478] INFO [ExpirationReaper-0-Fetch]: запуск (kafka.server.DelayedOperationPurgatory $ ExpiredOperationReaper) [2018-09-17 22: 01: 31,479] ИНФОРМАЦИЯ [ExpirationReaper-0-DeleteRecords]: запуск (kafka.server.DelayedOperationPurgatory $ ExpiredOperationReaper) [2018-09-17 22: 01: 31,487] ИНФО [LogDirFailureHandler]: запуск (kafka).server.ReplicaManager $ LogDirFailureHandler) [2018-09-17 22: 01: 31,537] INFO Создание / brokers / ids / 0 (это безопасно? false) (kafka.zk.KafkaZkClient) [2018-09-17 22:01: 31,541] INFO Результат создания znode в / brokers / ids / 0: OK (kafka.zk.KafkaZkClient) [2018-09-17 22: 01: 31,542] INFO Зарегистрированный брокер 0 в пути / brokers / ids / 0 садреса: ArrayBuffer (EndPoint (192.168.0.11,9092, списокenerName (PLAINTEXT), PLAINTEXT)) (kafka.zk.KafkaZkClient)
затем, когда я написал много данных, я увидел, что сегменты также катятся, я вижу много активности, которая подталкивает уплотнениепроизойдет.[это нормально] я отправил более 300 тыс. записей, и происходит сжатие, и новый потребитель, который потребляет сообщения (после сжатия), видит около 3225 записей.
[2018-09-17 22: 09: 21,602] ИНФОРМАЦИЯ [Журнал раздел = my-topic-compact-0, dir = / tmp / kafka-logs] Свернут новый сегмент журнала со смещением 185361 за 4 мс.(kafka.log.Log) [2018-09-17 22: 09: 21,673] ИНФОРМАЦИЯ [ProducerStateManager partition = my-topic-compact-0] Запись снимка производителя по смещению 188897 (kafka.log.ProducerStateManager) [2018-09-17 22: 09: 21,675] INFO [Журнал раздел = my-topic-compact-0, dir = / tmp / kafka-logs] Свернутый новый сегмент журнала со смещением 188897 за 3 мс.(kafka.log.Log) [2018-09-17 22: 09: 21,755] ИНФОРМАЦИЯ [ProducerStateManager partition = my-topic-compact-0] Запись снимка производителя по смещению 192348 (kafka.log.ProducerStateManager) [2018-09-17 22: 09: 21,758] ИНФОРМАЦИЯ [Журнал раздел = my-topic-compact-0, dir = / tmp / kafka-logs] Свернутый новый сегмент журнала со смещением 192348 за 3 мс.(kafka.log.Log) [2018-09-17 22: 09: 21,831] ИНФОРМАЦИЯ [ProducerStateManager partition = my-topic-compact-0] Запись снимка производителя по смещению 195846 (kafka.log.ProducerStateManager) [2018-09-17 22: 09: 21,834] INFO [Журнал раздел = my-topic-compact-0, dir = / tmp / kafka-logs] Свернутый новый сегмент журнала со смещением 195846 за 3 мс.(kafka.log.Log) [2018-09-17 22: 09: 21,879] ИНФОРМАЦИЯ [ProducerStateManager partition = my-topic-compact-0] Запись снимка источника по смещению 199461 (kafka.log.ProducerStateManager) [2018-09-17 22: 09: 21,882] INFO [Журнал раздел = my-topic-compact-0, dir = / tmp / kafka-logs] Свернутый новый сегмент журнала со смещением 199461 за 3 мс.(kafka.log.Log) [2018-09-17 22: 09: 21,909] INFO [ProducerStateManager partition = my-topic-compact-0] Запись снимка производителя по смещению 203134 (kafka.log.ProducerStateManager) [2018-09-17 22: 09: 21,915] INFO [Журнал раздел = my-topic-compact-0, dir = / tmp / kafka-logs] Свернутый новый сегмент журнала со смещением 203134 за 7 мс.(kafka.log.Log) [2018-09-1722: 09: 21,980] ИНФОРМАЦИЯ [Раздел ProducerStateManager = my-topic-compact-0]
Написание снимка производителя по смещению 206703
(kafka.log.ProducerStateManager) [2018-09-17 22: 09: 21,985] ИНФОРМАЦИЯ [Журнал
раздел = my-topic-compact-0, dir = / tmp / kafka-logs] свернутый новый журнал
сегмент со смещением 206703 за 6 мс. (Kafka.log.Log)
теперь независимо от того, сколько времени ожидания (последние 12 секунд), сжатие журнала не запускается
независимо от того, сколько я жду перед выполнением следующей команды (с новой группой потребителей каждый раз)
. / Kafka-console-customer --bootstrap-server localhost: 9092 --topic
my-topic-compact --from-начала --property print.key = true --group
новый group16
Каждый новый потребитель потребляет ровно 3225 сообщений,
Если сжатие должно было произойти после того, как пройден сегмент.ms уровня темы,
он должен был быть сжат до 20 ключей и их последних значений.
Но мы не видим такого поведения. Я что-то упустил?
УДАЛЕНИЯ НЕ РАБОТАЮТ
Кроме того, когда я отправляю нулевую полезную нагрузку для тех же ключей, как это
@Override
public void run(String... strings) throws Exception {
String data = "Spring Kafka Producer and Consumer Example";
for (int j = 0; j < 2; j++) {
for (int i = 0; i < 20; i++) {
sender.send(new Integer(i).toString(), null);
}
}
}
Мы ожидаем, что сообщения будут в конечном итоге удалены к следующему циклу уплотнения. Это не происходит для нас также и после того, как проходит сегмент.ms (в нашем случае 12 секунд в конфигурации уровня темы)
. / Kafka-configs --zookeeper localhost: 2181 - темы типа объекта
--entity-name my-topic-compact --describe
Конфиги для темы 'my-topic-compact' - это Конфиги для темы
«моя тема компактная»
segment.bytes = 200000, min.cleanable.dirty.ratio = 0,01, delete.retention.ms = 100, segment.ms = 12000, cleanup.policy = компактный