Ошибка в том, что вы отправляете все сообщения в одну очередь.
Вы должны использовать разные очереди для каждого слушателя. Ваши привязки просто говорят, что сообщения с RK = "invoice" и RK = "order" должны отправляться в одну и ту же очередь, а не то, что слушатель обрабатывает элементы очереди с этим RK.
Вы должны связать, например, обмен на DRIVER_QUEUE1 (например, "очереди-заказы") с помощью ключа "invoice" и обмен на DRIVER_QUEUE2 (например, "очереди-счета-фактуры") с помощью ключа "order". Таким образом, вы разделяете сообщения и можете разместить двух слушателей, одного для счетов-фактур и одного для заказов. Например. как то так:
@RabbitListener(queues = "queue-orders")
public void handleOrders(@Payload Order in,
@Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String key) {
logger.info("Key: {}, msg: {}",key,in.toString());
}
@RabbitListener(queues = "queue-invoices")
public void handleInvoices(@Payload Invoice in,
@Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String key) {
logger.info("Key: {}, msg: {}",key,in.toString());
}
Мне не нравится полная полная аннотация, так как, когда конфигурация брокера выполнена, полная аннотация IMHO становится бесполезной (или, что лучше, добавляет бесполезную проверку для меня бесполезной). Но если вы предпочитаете, вся аннотация должна выглядеть как
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-orders", durable = "true"),
exchange = @Exchange(value = "exchange", ignoreDeclarationExceptions = "true", autoDelete = "true"),
key = "invoice")
)
тогда вы можете отправлять сообщения через convertAndSend (exchangename, routingkey, object), как в
Order order = new Order(...);
rabbitTemplate.convertAndSend("exchange", "order", order);
Invoice invoice = new Invoice(...);
rabbitTemplate.convertAndSent("exchange", "invoice", invoice);
Если ваше загрузочное приложение реализует RabbitListenerConfigurer, то вы можете настроить все как, например,
@SpringBootApplication
public class MyApplication implements RabbitListenerConfigurer {
// other config stuff here....
@Bean("queue1")
public Queue queue1() {
return new Queue("queue-orders", true);
}
@Bean("queue2")
public Queue queue2() {
return new Queue("queue-invoices", true);
}
@Bean
public Binding binding1(@Qualifier("queue1") Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("invoice");
}
@Bean
public Binding binding2(@Qualifier("queue2") Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order");
}
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(consumerJackson2MessageConverter());
return factory;
}
@Override
public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
// Exchange.
@Bean
public TopicExchange exchange() {
return new TopicExchange("exchange");
}
}
Надеемся, что ответили на ваш запрос.