У меня есть служба весенней загрузки (2.1.3), публикующая сообщения в теме kafka (2.12-2.3.0). Служба создает тему и позже, после запуска службы, устанавливает для retention.ms значение 1 секунда.
В настоящее время выполняется отладка этого кода
@SpringBootApplication()
@EnableAsync
public class MetricsMsApplication {
public static void main(String[] args) {
SpringApplication.run(MetricsMsApplication.class, args);
}
@Bean
public NewTopic topic1() {
NewTopic nt = new NewTopic("metrics", 10, (short) 1);
return nt;
}
@EventListener(ApplicationReadyEvent.class)
private void init() throws ExecutionException, InterruptedException {
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
AdminClient client = AdminClient.create(config);
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "metrics");
// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1000");
Map<ConfigResource, Config> updateConfig = new HashMap<ConfigResource, Config>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));
AlterConfigsResult alterConfigsResult = client.alterConfigs(updateConfig);
alterConfigsResult.all();
}
}
Я отправляю пару сообщений и считаю до 5, затем запускаю консольного потребителя
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic admst-metrics --from-beginning
и по-прежнему получать сообщения, срок действия которых должен был истечь.
Журналы kafka показывают, что была применена конфигурация retention.ms. Я добавил cleanup.policy и установил его для удаления, но в этом нет необходимости, поскольку он используется по умолчанию.
Что приведет к удалению этих сообщений?