Spring Kafka позволяет создавать новые темы, объявляя @Bean
s в контексте вашего приложения. Это потребует bean-компонента типа KafkaAdmin
в контексте приложения, который будет создан автоматически при использовании Spring Boot. Вы можете определить свою тему следующим образом:
@Bean
public NewTopic myTopic() {
return TopicBuilder.name("my-topic")
.partitions(4)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG, "1680000")
.build();
}
Если вы не используете Spring Boot, вам дополнительно нужно определить bean-компонент KafkaAdmin
:
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
return new KafkaAdmin(configs);
}
Если вы хотите отредактировать конфигурацию существующей темы, вам придется использовать AdminClient
, вот фрагмент, чтобы изменить retention.ms
на уровне темы:
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
AdminClient client = AdminClient.create(config);
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic");
// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1680000");
Map<ConfigResource, Config> updateConfig = new HashMap<ConfigResource, Config>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));
AlterConfigsResult alterConfigsResult = client.alterConfigs(updateConfig);
alterConfigsResult.all();