Иногда (кажется очень случайным) Кафка отправляет старые сообщения.Я хочу только самые последние сообщения, поэтому он перезаписывает сообщения с тем же ключом.В настоящее время, похоже, у меня есть несколько сообщений с одним и тем же ключом, которые не уплотняются.
Я использую этот параметр в теме:
cleanup.policy=compact
Я использую Java / Kotlin и клиент Apache Kafka 1.1.1.
Properties(8).apply {
val jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"
val jaasCfg = String.format(jaasTemplate, Configuration.kafkaUsername, Configuration.kafkaPassword)
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS)
put(ConsumerConfig.GROUP_ID_CONFIG,
"ApiKafkaKotlinConsumer${Configuration.kafkaGroupId}")
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer::class.java.name)
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer::class.java.name)
put("security.protocol", "SASL_SSL")
put("sasl.mechanism", "SCRAM-SHA-256")
put("sasl.jaas.config", jaasCfg)
put("max.poll.records", 100)
put("receive.buffer.bytes", 1000000)
}
Я пропустилнекоторые настройки?