динамическое создание контейнера в spring-rabbitmq за очередь - PullRequest
0 голосов
/ 09 июля 2019

Мое приложение имеет несколько очередей (имена очередей будут взяты из базы данных), и каждая из них будет ежедневно потреблять огромные данные. Для этой цели мне нужно создать один контейнер и прослушиватель сообщений для каждой очереди, чтобы для каждой очереди был отдельный поток. В дополнение к этому, могут быть некоторые очереди, создаваемые динамически, и мне нужно, чтобы контейнер был назначен для вновь созданных очередей

Мой потребительский класс начинается, как показано ниже

// Ниже показано, с чего начинается мой класс

@Component
public class RequestConsumer implements MessageListener {```
//and below is the code by which I am creating Message listner
@Bean
    @Scope(value = "prototype")
    public SimpleMessageListenerContainer simpleMessageListenerNotification(
            ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer simpleMessageListenerContainer =
                new SimpleMessageListenerContainer(connectionFactory);
        RabbitAdmin rabbitAdmin = getRabbitAdmin(connectionFactory);
        RequestConsumer RequestConsumer = (RequestConsumer) beanFactory.getBean("requestConsumer");
        simpleMessageListenerContainer.setupMessageListener(RequestConsumer);
        simpleMessageListenerContainer.setAutoDeclare(true);
        for (String queueName : requestConsumerQueueList()) {
            Queue queue = new Queue(queueName);
            rabbitAdmin.declareQueue(queue);
            simpleMessageListenerContainer.addQueues(queue);
        }
        simpleMessageListenerContainer.start();
        return simpleMessageListenerContainer;
    }

Мой текущий код создает только один контейнер с одним messageListner для всех очередей, в то время как я ожидаю отдельный контейнер для каждой очереди.

1 Ответ

0 голосов
/ 09 июля 2019

Во-первых, вы не должны объявлять очереди в определении компонента - это слишком рано в жизненном цикле контекста.

Вы также не должны вызывать start() в определении бина - опять же, слишком рано.

Вы должны сделать что-то вроде этого:

@SpringBootApplication
public class So56951298Application {

    public static void main(String[] args) {
        SpringApplication.run(So56951298Application.class, args);
    }

    @Bean
    public Declarables queues() {
        return new Declarables(Arrays.asList(new Queue("q1"), new Queue("q2")));
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
            Queue queue) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue);
        container.setMessageListener(msg -> System.out.println(msg));
        return container;
    }

    @Bean
    public ApplicationRunner runner(ConnectionFactory connectionFactory, Declarables queues) {
        return args -> {
            queues.getDeclarables().forEach(dec -> container(connectionFactory, (Queue) dec).start());
        };
    }

}

Фреймворк будет автоматически объявлять очереди в нужное время (при условии, что в контексте приложения есть RabbitAdmin (который Spring Boot автоматически настраивает).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...