Использовать свойства конфигурации и проекцию коллекции ...
@ConfigurationProperties("service.kafka.producer")
@Component
public class ConfigProps {
List<Topic> topics = new ArrayList<>();
public List<Topic> getTopics() {
return this.topics;
}
public void setTopics(List<Topic> topics) {
this.topics = topics;
}
@Override
public String toString() {
return "ConfigProps [topics=" + this.topics + "]";
}
public static class Topic {
private String name;
private int numPartitions;
private short replicationFactor;
public String getName() {
return this.name;
}
public void setName(String name) {
this.name = name;
}
public int getNumPartitions() {
return this.numPartitions;
}
public void setNumPartitions(int numPartitions) {
this.numPartitions = numPartitions;
}
public short getReplicationFactor() {
return this.replicationFactor;
}
public void setReplicationFactor(short replicationFactor) {
this.replicationFactor = replicationFactor;
}
@Override
public String toString() {
return "Topic [name=" + this.name + ", numPartitions=" + this.numPartitions + ", replicationFactor="
+ this.replicationFactor + "]";
}
}
}
и
@SpringBootApplication
public class So52741016Application {
public static void main(String[] args) {
SpringApplication.run(So52741016Application.class, args);
}
@KafkaListener(groupId = "${service.kafka.groupId}", topics = "#{configProps.topics.![name]}")
public void listener(String in) {
}
@Bean
public SmartLifecycle createTopics(KafkaAdmin admin, ConfigProps props) {
return new SmartLifecycle() {
@Override
public int getPhase() {
return Integer.MIN_VALUE;
}
@Override
public void stop() {
}
@Override
public void start() {
try (AdminClient client = AdminClient.create(admin.getConfig())) {
CreateTopicsResult createTopics = client.createTopics(props.topics.stream()
.map(t -> new NewTopic(t.getName(), t.getNumPartitions(), t.getReplicationFactor()))
.collect(Collectors.toList()));
createTopics.all().get();
}
catch (Exception e) {
// e.printStackTrace();
}
}
@Override
public boolean isRunning() {
return false;
}
@Override
public void stop(Runnable callback) {
}
@Override
public boolean isAutoStartup() {
return true;
}
};
}
}
и
2018-10-10 11:20:25.813 ИНФО 14979 --- [ntainer # 0-0-C-1] osklKafkaMessageListenerContainer: назначенные разделы: [запрос1-4, запрос2-0, запрос1-0, запрос2-1, запрос1-1, запрос2-2, запрос1-2, request1-3]
Конечно, это только темы для продюсеров, но вы можете справиться с ними таким образом.