Кафка подписаться только на последнее сообщение? - PullRequest
0 голосов
/ 18 января 2019

Иногда (кажется очень случайным) Кафка отправляет старые сообщения.Я хочу только самые последние сообщения, поэтому он перезаписывает сообщения с тем же ключом.В настоящее время, похоже, у меня есть несколько сообщений с одним и тем же ключом, которые не уплотняются.

Я использую этот параметр в теме:

cleanup.policy=compact

Я использую Java / Kotlin и клиент Apache Kafka 1.1.1.

Properties(8).apply {
    val jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"
    val jaasCfg = String.format(jaasTemplate, Configuration.kafkaUsername, Configuration.kafkaPassword)
    put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            BOOTSTRAP_SERVERS)
    put(ConsumerConfig.GROUP_ID_CONFIG,
            "ApiKafkaKotlinConsumer${Configuration.kafkaGroupId}")
    put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer::class.java.name)
    put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer::class.java.name)

    put("security.protocol", "SASL_SSL")
    put("sasl.mechanism", "SCRAM-SHA-256")
    put("sasl.jaas.config", jaasCfg)
    put("max.poll.records", 100)
    put("receive.buffer.bytes", 1000000)
}

Я пропустилнекоторые настройки?

1 Ответ

0 голосов
/ 18 января 2019

Если вы хотите иметь только одно значение для каждого ключа, вы должны использовать KTable<K,V> абстракция: StreamsBuilder::table(final String topic) из Kafka Streams .В используемой теме должна быть установлена ​​политика очистки compact.

. Если вы используете KafkaConsumer, вы просто извлекаете данные из брокеров.Он не дает никакого механизма, который выполняет какую-то дедупликацию.В зависимости от того, было выполнено сжатие или нет, вы можете получить от одного до n сообщений для одного и того же ключа.

Относительно сжатия

Сжатие не означает, что все старые значения для одного и того же ключа удалены немедленно.Когда old сообщение для того же ключа будет удалено, зависит от нескольких свойств.Наиболее важными являются:

  • log.cleaner.min.cleanable.ratio

Минимальное отношение грязного бревна к общему бревну для бревна, имеющего право на очистку

  • log.cleaner.min.compaction.lag.ms

Минимальное время, в течение которого сообщение остается некомпактным в журнале.Применимо только для сжатых журналов.

  • log.cleaner.enable

Разрешить запуск процесса очистки журналов на сервере.Должен быть включен, если используются какие-либо темы с cleanup.policy = compact, включая тему внутренних смещений.Если этот параметр отключен, он не будет сжиматься и постоянно увеличиваться в размере.

Более подробную информацию о сжатии вы можете найти https://kafka.apache.org/documentation/#compaction

...