Apache Kafka динамически меняет топи с подпиской c во время выполнения - PullRequest
0 голосов
/ 25 мая 2020

Есть ли другой способ изменить топи c динамически в Apache Кафка с помощью Spring-Kafka ?

Я могу изменить топи c с уже определенными beans, аннотированными @KafkaListener, но в каждом имя топи c жестко запрограммировано.

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    private final KafkaTemplate kafkaTemplate;

    List<Message> messages = new ArrayList<>();


    @KafkaListener(id = "topic_listener1", topics = "test1", containerFactory = "listenerContainerFactory", autoStartup = "false")
    public void listener(Message message) {
        messages.clear();
        messages.add(message);
    }

    @KafkaListener(id = "topic_listener2", topics = "test2", containerFactory = "listenerContainerFactory", autoStartup = "false")
    public void listener2(Message message) {
        messages.clear();
        messages.add(message);
    }

и когда я хочу изменить подписанный topi c, я делаю вот так:

  @GetMapping(value = "/room/{id}")
    public List<Message> getRoomMessages(@PathVariable("id") String id) {
        registry.getListenerContainer("topic_listener1").start();
        registry.getListenerContainer("topic_listener1").stop();
          registry.getListenerContainer("topic_listener2").start();
        return messages;
    }

Подводя итог: есть ли шанс передать не последнее поле как topi c name в KafkaListener или создать новые beans, аннотированные @KafkaListener с указанным новым topi c имя?

...