Очередь RabbitMQ и ключ маршрутизации - PullRequest
0 голосов
/ 08 мая 2018

в документации https://docs.spring.io/spring-amqp/reference/htmlsingle/ я вижу

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "myQueue", durable = "true"),
        exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
        key = "orderRoutingKey")
  )
  public void processOrder(Order order) {

  }

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue,
        exchange = @Exchange(value = "auto.exch"),
        key = "invoiceRoutingKey")
  )
  public void processInvoice(Invoice invoice) {

  }

Здесь 1 очередь и 2 других ключа маршрутизации, каждый за свой метод Но мой код не получает сообщение от ключа!

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = DRIVER_QUEUE, durable = "true"),
            exchange = @Exchange(value = "exchange", ignoreDeclarationExceptions = "true", autoDelete = "true"),
            key = "order")
    )
    public String getOrders(byte[] message) throws InterruptedException {
         System.out.println("Rout order");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = DRIVER_QUEUE, durable = "true"),
            exchange = @Exchange(value = "exchange", ignoreDeclarationExceptions = "true", autoDelete = "true"),
            key = "invoice")
    )
    public String getOrders(byte[] message) throws InterruptedException {
         System.out.println("Rout invoice");
    }

они все получают сообщение из очереди и не видят ключ ... сайт отправляет в очередь сообщение с ключом "invoice" и вижу в консоли "Route order" В чем проблема?? Большое спасибо!

rabbitmq 3.7.3 весна 4.2.9 org.springframework.amqp 1.7.5

1 Ответ

0 голосов
/ 09 мая 2018

Ошибка в том, что вы отправляете все сообщения в одну очередь.

Вы должны использовать разные очереди для каждого слушателя. Ваши привязки просто говорят, что сообщения с RK = "invoice" и RK = "order" должны отправляться в одну и ту же очередь, а не то, что слушатель обрабатывает элементы очереди с этим RK.

Вы должны связать, например, обмен на DRIVER_QUEUE1 (например, "очереди-заказы") с помощью ключа "invoice" и обмен на DRIVER_QUEUE2 (например, "очереди-счета-фактуры") с помощью ключа "order". Таким образом, вы разделяете сообщения и можете разместить двух слушателей, одного для счетов-фактур и одного для заказов. Например. как то так:

@RabbitListener(queues = "queue-orders")
public void handleOrders(@Payload Order in,
      @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String key) {
   logger.info("Key: {}, msg: {}",key,in.toString());
}


@RabbitListener(queues = "queue-invoices")
public void handleInvoices(@Payload Invoice in, 
      @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String key) {
   logger.info("Key: {}, msg: {}",key,in.toString());
}

Мне не нравится полная полная аннотация, так как, когда конфигурация брокера выполнена, полная аннотация IMHO становится бесполезной (или, что лучше, добавляет бесполезную проверку для меня бесполезной). Но если вы предпочитаете, вся аннотация должна выглядеть как

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "queue-orders", durable = "true"),
        exchange = @Exchange(value = "exchange", ignoreDeclarationExceptions = "true", autoDelete = "true"),
        key = "invoice")
)

тогда вы можете отправлять сообщения через convertAndSend (exchangename, routingkey, object), как в

Order order = new Order(...);
rabbitTemplate.convertAndSend("exchange", "order", order);
Invoice invoice = new Invoice(...);
rabbitTemplate.convertAndSent("exchange", "invoice", invoice);

Если ваше загрузочное приложение реализует RabbitListenerConfigurer, то вы можете настроить все как, например,

@SpringBootApplication
public class MyApplication implements RabbitListenerConfigurer {
   // other config stuff here....

    @Bean("queue1")
    public Queue queue1() {
        return new Queue("queue-orders", true);
    }

    @Bean("queue2")
    public Queue queue2() {
        return new Queue("queue-invoices", true);
    }

    @Bean
    public Binding binding1(@Qualifier("queue1") Queue queue, TopicExchange exchange) {        
        return BindingBuilder.bind(queue).to(exchange).with("invoice");
    }

    @Bean
    public Binding binding2(@Qualifier("queue2") Queue queue, TopicExchange exchange) {        
        return BindingBuilder.bind(queue).to(exchange).with("order");
    }

    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(consumerJackson2MessageConverter());
        return factory;
    }

    @Override
    public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }

    // Exchange.
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("exchange");
    }
}

Надеемся, что ответили на ваш запрос.

...