Как прослушать существующую очередь в Spring AMQP? - PullRequest
0 голосов
/ 30 августа 2018

У меня есть удаленный сервер RabbitMQ, у которого есть несколько очередей, которые я хочу прослушать. Я попробовал это:

@RabbitListener(queues = "queueName")
public void receive(String message) {
    System.out.println(message);
}

Но он попытался создать новую очередь. Результат предсказуем - доступ запрещен.

o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: queueName

Я не объявлял никакой очереди другим способом.

Как я могу прослушать существующую очередь на удаленном сервере? Кроме того, есть ли способ проверить, существует ли эта очередь? И я увидел эту строку

@RabbitListener(queues = "#{autoDeleteQueue2.name}")

в учебнике. Что означает #{queueName.name}? 1013 *

Журналы и начало трассировки стека:

2018-08-30 22:10:21.968  WARN 12124 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: queueName
2018-08-30 22:10:21.991  WARN 12124 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=3

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[queueName]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:711) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:588) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:996) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

Ответы [ 3 ]

0 голосов
/ 30 августа 2018

Даже если у вас нет разрешения на настройку посредника, queueDeclarePassive, используемый слушателем, разрешен (он проверяет наличие очереди).

o.s.a.r.listener.BlockingQueueConsumer: не удалось объявить очередь: имя-очереди

Это просто означает, что очередь не существует.

@RabbitListener(queues = "#{autoDeleteQueue2.name}")

Используется для получения имени очереди во время выполнения (когда у вас есть разрешение на создание очередей).

, например

@Bean
public AnonymousQueue autoDeleteQueue2() {
    return new AnonymousQueue();
}

Spring добавит эту очередь в брокер со случайным уникальным именем. Слушатель тогда настроен с фактическим именем очереди.

0 голосов
/ 31 августа 2018

Вот пример того, как прослушивать определенную «очередь» с помощью Spring Integration:

SpringIntegrationConfiguration.java

@Configuration
public class SpringIntegrationConfiguration {

@Value("${rabbitmq.queueName}")
private String queueName;

@Bean
public IntegrationFlow ampqInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, queueName))
            .handle(System.out::println)
            .get();
  }
}

ApplicationConfiguration.java

@Configuration
public class ApplicationConfiguration {

@Value("${rabbitmq.topicExchangeName}")
private String topicExchangeName;

@Value("${rabbitmq.queueName}")
private String queueName;

@Value("${rabbitmq.routingKey}")
private String routingKey;

@Bean
Queue queue() {
    return new Queue(queueName, false);
}

@Bean
TopicExchange exchange() {
    return new TopicExchange(topicExchangeName);
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(routingKey);
  }

}

Application.yml

rabbitmq:
topicExchangeName: spring-boot-exchange
queueName: spring-boot
routingKey: foo.bar.#
0 голосов
/ 30 августа 2018

Вот пример того, как прослушивать очередь с rabbitMq:

@Component
public class RabbitConsumer implements MessageListener {

    @RabbitListener(bindings =
    @QueueBinding(
            value = @Queue(value = "${queue.topic}", durable = "true"),
            exchange = @Exchange(value = "${queue.exchange}", type = ExchangeTypes.FANOUT, durable = "true")
    )
    )
    @Override
    public void onMessage(Message message) {
        // ...
    }
}

И конфиг (application.yaml):

queue:
  topic: mytopic
  exchange: myexchange

В rabbitmq потребитель связан с обменом. Это позволяет вам определить, как сообщения должны потребляться (все ли потребители слушают все сообщения? Достаточно ли этого, если сообщение прочитал только один потребитель? ...)

...