Есть ли другой способ изменить топи c динамически в Apache Кафка с помощью Spring-Kafka ?
Я могу изменить топи c с уже определенными beans, аннотированными @KafkaListener
, но в каждом имя топи c жестко запрограммировано.
@Autowired
private KafkaListenerEndpointRegistry registry;
private final KafkaTemplate kafkaTemplate;
List<Message> messages = new ArrayList<>();
@KafkaListener(id = "topic_listener1", topics = "test1", containerFactory = "listenerContainerFactory", autoStartup = "false")
public void listener(Message message) {
messages.clear();
messages.add(message);
}
@KafkaListener(id = "topic_listener2", topics = "test2", containerFactory = "listenerContainerFactory", autoStartup = "false")
public void listener2(Message message) {
messages.clear();
messages.add(message);
}
и когда я хочу изменить подписанный topi c, я делаю вот так:
@GetMapping(value = "/room/{id}")
public List<Message> getRoomMessages(@PathVariable("id") String id) {
registry.getListenerContainer("topic_listener1").start();
registry.getListenerContainer("topic_listener1").stop();
registry.getListenerContainer("topic_listener2").start();
return messages;
}
Подводя итог: есть ли шанс передать не последнее поле как topi c name в KafkaListener
или создать новые beans, аннотированные @KafkaListener
с указанным новым topi c имя?