Кафка Спринг: Как создавать Слушатели динамически или в цикле? - PullRequest
0 голосов
/ 11 декабря 2018

У меня есть 4 слушателя ConsumerFactory, которые читают по 4 различным темам, подобным этому:

@KafkaListener(
      id = "test1",
      topicPattern  = "test.topic1",
      groupId = "pp-test1")
  public void listenTopic1(ConsumerRecord<String, String> record) {
    System.out.println("Topic is: " + record.topic());   
  }

Но у нас будет 50 тем, и я хочу настроить как минимум 25 слушателей для исполнения betetr.Как я могу сделать это динамически вместо того, чтобы вручную писать 25 слушателей?

Ответы [ 2 ]

0 голосов
/ 13 декабря 2018

Допустим, ваш метод слушателя является частью приложения Springboot под названием Listener.Прочитайте имя темы из приложения конфигурации yml:

@KafkaListener(
  topics = "${ai.kafka.consumer.topic-name}")
public void listenTopic1(ConsumerRecord<String, String> record) {
System.out.println("Topic is: " + record.topic());   
}

, а затем разверните столько слушателей, сколько хотите, каждый с различным значением имени темы в application.yml.(Если вы используете Docker-контейнер, это проще).

0 голосов
/ 11 декабря 2018

Вы не можете создавать @KafkaListener s программно, только контейнеры дискретных слушателей (с настраиваемым слушателем).

Это можно сделать программным путем, создавая дочерний контекст приложения для каждого слушателя.

РЕДАКТИРОВАТЬ

@SpringBootApplication
public class So53715268Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(So53715268Application.class, args);
        for (int i = 0; i < 2; i++) {
            AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
            child.setParent(context);
            child.register(ChildConfig.class);
            Properties props = new Properties();
            props.setProperty("group", "group." + i);
            props.setProperty("topic", "topic" + i);
            PropertiesPropertySource pps = new PropertiesPropertySource("listenerProps", props);
            child.getEnvironment().getPropertySources().addLast(pps);
            child.refresh();
        }
    }

}

и

@Configuration
@EnableKafka
public class ChildConfig {

    @Bean
    public Listener listener() {
        return new Listener();
    }

}

и

public class Listener {

    @KafkaListener(id = "${group}", topics = "${topic}")
    public void listen(String in) {
        System.out.println(in);
    }

}

и

: partitions assigned: [topic0-0]
: partitions assigned: [topic1-0]

Обратите внимание, что есливы используете Spring Boot, дочерний класс конфигурации и слушатель должны быть в другом пакете с основным приложением (и не в подпакете).

...