Spring 2.1.0.M4 rabbitmq объявляет очереди и связывает их со слушателями во время выполнения - PullRequest
0 голосов
/ 14 октября 2018

Допустим, у меня есть заявленный слушатель:

Listener.java

@RabbitListener(id = "test listener 1")
    public String test2(String req) {
        return req + " result";
}

Я пытаюсь выставить его через очередь во время выполнения:

ListenerTest.java

Queue declaredQueue = new Queue("new.queue", false);

admin.declareQueue(declaredQueue);

SimpleMessageListenerContainer listener = (SimpleMessageListenerContainer) 
            registry.getListenerContainer("test listener 1");
listener.addQueues(declaredQueue);

И затем я пытаюсь отправить сообщение о вновь объявленной очереди:

String result = template.convertSendAndReceiveAsType("new.queue", "req", ParameterizedTypeReference.forType(String.class));

Но он просто отключается и возвращает ноль.

Когда я проверяю слушателя в отладчике, я не вижу никаких потребителей, связанных с новой очередью enter image description here

Вы можете найти по конфигурации кролика здесь и остальные источники, чтобы проверить это здесь .

Стоит отметить, что эта точная настройка работает в весенней загрузочной версии 2.0.5.RELEASE , так что это может быть ошибкой.Мне нужно найти способ реинициализировать потребителей.

Ответы [ 2 ]

0 голосов
/ 14 октября 2018

Добавление очередей во время выполнения заставит контейнер перерабатывать своих потребителей (эквивалент остановки и перезапуска контейнера).См. https://github.com/spring-projects/spring-amqp/blob/master/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java#L687 Это связано с тем, как рассчитан потребитель;каждый потребительский поток потребляет из нескольких очередей.

Изменение количества потребителей не перезапускает всех потребителей;см. https://github.com/spring-projects/spring-amqp/blob/master/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java#L168

Новый DirectMessageListenerContainer не должен перезапускать своих потребителей при добавлении очередей (в очереди есть хотя бы один потребитель).

Однако онне поддерживает динамическое параллельное масштабирование.

0 голосов
/ 14 октября 2018

После выпуска 2.1.0 SimpleMessageListenerContainer больше не переопределяет AbstractMessageListenerContainer#addQueues, из-за чего SimpleMessageListenerContainer#queuesChanged не вызывается (вероятно, ошибка).Однако SimpleMessageListenerContainer#addQueueNames переопределяется и работает.Так что это либо должно быть изменено на

SimpleMessageListenerContainer listener = (SimpleMessageListenerContainer) 
            registry.getListenerContainer("test listener 1");
listener.addQueueNames(declaredQueue.getName());

, либо queuesChanged должно вызываться вручную

SimpleMessageListenerContainer listener = (SimpleMessageListenerContainer) 
            registry.getListenerContainer("test listener 1");
listener.addQueues(declaredQueue);
Method method = SimpleMessageListenerContainer.class.getDeclaredMethod("queuesChanged");
method.setAccessible(true);
method.invoke(listener);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...