Как настроить политику хранения тем kafka при создании в spring-mvc? - PullRequest
0 голосов
/ 11 июня 2019

Мне нужно настроить политику хранения определенной темы при создании. Я пытался найти решение, я мог только найти команду уровня команды изменить, как показано ниже

. / Bin / kafka-topics.sh --zookeeper localhost: 2181 --alter --topic my-topic --config retention.ms = 1680000

Может кто-нибудь сообщить мне способ его настройки при создании, что-то вроде XML или настройки файла свойств в spring-mvc.

Ответы [ 2 ]

1 голос
/ 11 июня 2019

Spring Kafka позволяет создавать новые темы, объявляя @Bean s в контексте вашего приложения. Это потребует bean-компонента типа KafkaAdmin в контексте приложения, который будет создан автоматически при использовании Spring Boot. Вы можете определить свою тему следующим образом:

@Bean
public NewTopic myTopic() {
    return TopicBuilder.name("my-topic")
            .partitions(4)
            .replicas(3)
            .config(TopicConfig.RETENTION_MS_CONFIG, "1680000")
            .build();
}

Если вы не используете Spring Boot, вам дополнительно нужно определить bean-компонент KafkaAdmin:

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    return new KafkaAdmin(configs);
}

Если вы хотите отредактировать конфигурацию существующей темы, вам придется использовать AdminClient, вот фрагмент, чтобы изменить retention.ms на уровне темы:

Map<String, Object> config = new HashMap<>();                
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

AdminClient client = AdminClient.create(config);

ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic");

// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1680000");
Map<ConfigResource, Config> updateConfig = new HashMap<ConfigResource, Config>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));

AlterConfigsResult alterConfigsResult = client.alterConfigs(updateConfig);
alterConfigsResult.all();
1 голос
/ 11 июня 2019

Полагаю, вы могли бы использовать клиент администратора (https://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html) для этого. Вы можете создать экземпляр клиента администратора в вашем приложении и использовать команду создания или изменения темы для управления конфигурациями темы, включая сохранение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...