Проблема с RabbitMQ Прямой ответ на Spring - PullRequest
0 голосов
/ 16 января 2020

Я работаю над приложением, которое отправляет сообщение как сервер, затем данное сообщение изменяется и отправляется обратно в очередь amq.rabbitmq.reply-to с помощью прямого ответа. Я следовал учебному пособию https://www.rabbitmq.com/direct-reply-to.html, но у меня есть некоторые проблемы с его реализацией. В моем случае, как я понял, мне нужно использовать сообщение из псевдо-очереди amq.rabbitmq.reply-to в режиме без подтверждения, который в моем случае равен MessageListenerContainer. Вот мой конфиг:

@Bean
    public Jackson2JsonMessageConverter messageConverter() {    
        ObjectMapper mapper = new ObjectMapper();
        return new Jackson2JsonMessageConverter(mapper);
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        rabbitTemplate.setReplyAddress("amq.rabbitmq.reply-to");
        return rabbitTemplate;
    }

    @Bean
    MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory ) {       
        DirectMessageListenerContainer directMessageListenerContainer = new DirectMessageListenerContainer();
        directMessageListenerContainer.setConnectionFactory(connectionFactory);
        directMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.NONE);
        directMessageListenerContainer.setQueueNames("amq.rabbitmq.reply-to");
        directMessageListenerContainer.setMessageListener(new PracticalMessageListener());
        return directMessageListenerContainer;

    }

Сообщение отправляется как JSON через кадр SEND по протоколу STOM и преобразуется. Затем новая очередь
создается динамически и добавляется в MessageListenerContainer. Поэтому, когда сообщение поступает в брокер, я хотел бы изменить его на стороне сервера и отправить обратно на amq.rabbitmq.reply-to, а исходное сообщение отправить на ключ маршрутизации messageTemp.getTo(), который подписан на фрейм SUBSCRIBE в STOMP.

  @MessageMapping("/private")
  public void send2(MessageTemplate messageTemp) throws Exception {
      MessageTemplate privateMessage = new MessageTemplate(messageTemp.getPerson(),
              messageTemp.getMessage(), 
              messageTemp.getTo());

     AbstractMessageListenerContainer abstractMessageListenerContainer =
              (AbstractMessageListenerContainer) mlc;

       // here's the queue added to listener container   
      abstractMessageListenerContainer.addQueueNames(messageTemp.getTo());

      MessageProperties mp = new MessageProperties();
      mp.setReplyTo("amq.rabbitmq.reply-to");
      mp.setCorrelationId("someId");

      Jackson2JsonMessageConverter smc = new Jackson2JsonMessageConverter();
      Message message = smc.toMessage(messageTemp, mp);


      rabbitTemplate.sendAndReceive( 
              messageTemp.getTo() , message);
  }

Сообщение модифицируется методом onMessage, когда сообщение отправляется на messageTemp.getTo() ключ маршрутизации

@Component
public class PracticalMessageListener implements MessageListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message) {
        System.out.println(("message listener.."));
        String body = "{ \"processing\": \"123456789\"}";
       MessageProperties properties = new MessageProperties();

       // some business logic on the message body

        properties.setCorrelationId(message.getMessageProperties().getCorrelationId());
        Message responseMessage = new Message(body.getBytes(), properties);

        rabbitTemplate.convertAndSend("", 
                message.getMessageProperties().getReplyTo(), responseMessage);
    }

Я могу неправильно понять концепцию прямого ответа и документацию, которая гласит:

Использовать из псевдо-очереди amq.rabbitmq.reply-to в режиме без подтверждения. Нет необходимости сначала объявлять эту «очередь», хотя клиент может сделать это, если захочет.

Вопрос в том, где мне потребляться из этой очереди? И как я могу получить доступ к этому измененному сообщению, если я получаю сообщение об ошибке:

2020-01-15 22:17:09.688  WARN 96222 --- [pool-1-thread-5] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
Caused by: java.lang.NullPointerException: null
    at com.patrykmaryn.spring.second.PracticalMessageListener.onMessage(PracticalMessageListener.java:50) ~[classes/:na]

, которое приходит с того места, когда я вызываю rabbitTemplate.convertAndSend в PracticalMessageListener

РЕДАКТИРОВАТЬ

Я избавился от настройки amq.rabbitmq.reply-to в DirectMessageListenerContainer и реализовал DirectReplyToMessageListenerContainer:

@Bean
    DirectReplyToMessageListenerContainer drtmlc (ConnectionFactory connectionFactory) {
        DirectReplyToMessageListenerContainer drtmlc =
                new DirectReplyToMessageListenerContainer(connectionFactory);
        drtmlc.setConnectionFactory(connectionFactory);
        drtmlc.setAcknowledgeMode(AcknowledgeMode.NONE);
        drtmlc.setMessageListener(new DirectMessageListener());
        return drtmlc;
    }

Проблема должна быть в onMessage метод, который не позволяет вызывать любой метод отправки на rabbitTemplate, я пробовал с другими существующими ключами маршрутизации и обменами. Прослушивание происходит из очереди, определенной с помощью ключа маршрутизации messageTemp.getTo().

@Override
    public void onMessage(Message message) {
        System.out.println(("message listener.."));

        String receivedRoutingKey = message.getMessageProperties()
           .getReceivedRoutingKey();
        System.out.println(" This is received routingkey: " + 
            receivedRoutingKey);

           /// ..... rest of code goes here

        rabbitTemplate.convertAndSend("", 
                message.getMessageProperties().getReplyTo(), 
                responseMessage);

Где messageTemp.getTo() - ключ маршрутизации, определенный во время выполнения, путем выбора получателя, например, если я выберу «user1», он выведет «user1».

Это первая попытка отправить сообщение:

2020-01-16 02:22:20.213 DEBUG 28490 --- [nboundChannel-6] .WebSocketAnnotationMethodMessageHandler : Searching methods to handle SEND /app/private session=45yca5sy, lookupDestination='/private'
2020-01-16 02:22:20.214 DEBUG 28490 --- [nboundChannel-6] .WebSocketAnnotationMethodMessageHandler : Invoking PracticalTipSender#send2[1 args]
2020-01-16 02:22:20.239  INFO 28490 --- [nboundChannel-6] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=user1, consumerTag=amq.ctag-Evyiweew4C-K1mXmy2XqUQ identity=57b19488] started
2020-01-16 02:22:20.268  INFO 28490 --- [nboundChannel-6] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2020-01-16 02:22:20.269  INFO 28490 --- [nboundChannel-6] .l.DirectReplyToMessageListenerContainer : Container initialized for queues: [amq.rabbitmq.reply-to]
2020-01-16 02:22:20.286  INFO 28490 --- [nboundChannel-6] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-IXWf-zEyI34xzQSSfbijzg identity=4bedbba5] started

И второе, что не удалось:

2020-01-16 02:23:20.247 DEBUG 28490 --- [nboundChannel-3] .WebSocketAnnotationMethodMessageHandler : Searching methods to handle SEND /app/private session=45yca5sy, lookupDestination='/private'
2020-01-16 02:23:20.248 DEBUG 28490 --- [nboundChannel-3] .WebSocketAnnotationMethodMessageHandler : Invoking PracticalTipSender#send2[1 args]
2020-01-16 02:23:20.248  WARN 28490 --- [nboundChannel-3] o.s.a.r.l.DirectMessageListenerContainer : Queue user1 is already configured for this container: org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer@3b152928, ignoring add
2020-01-16 02:23:20.250  WARN 28490 --- [nboundChannel-3] o.s.a.r.l.DirectMessageListenerContainer : Queue user1 is already configured for this container: org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer@3b152928, ignoring add
message listener..
 This is received routingkey: user1
2020-01-16 02:23:20.271  WARN 28490 --- [pool-1-thread-5] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception

РЕДАКТИРОВАТЬ

Поместить DirectReplyToMessageListenerContainer в отдельный класс и установить его MessageListener как @Bean, а также directMessageListenerContainer.setMessageListener(practicalMessageListener()); как @Bean, казалось, избавились от NPE. Но все же даже ответ идет на amq.rabbitmq.reply-to.g2dkABVyYWJ....., его, кажется, не слушают в DirectReplyToMessageListenerContainer drtmlc.

@Component
class DirectMessageListener implements MessageListener {
    // This doesn't get invoked...
    @Override
    public void onMessage(Message message) {
        System.out.println("direct reply message sent..");

    }
}

@Component
class ReplyListener {

    @Bean
    public DirectMessageListener directMessageListener() {
        return new DirectMessageListener(); 
    }

    @Bean
    DirectReplyToMessageListenerContainer drtmlc (ConnectionFactory connectionFactory) {
        DirectReplyToMessageListenerContainer drtmlc =
                new DirectReplyToMessageListenerContainer(connectionFactory);
        drtmlc.setConnectionFactory(connectionFactory);
        drtmlc.setAcknowledgeMode(AcknowledgeMode.NONE);
        drtmlc.setMessageListener(directMessageListener());
        return drtmlc;
    }
}

1 Ответ

1 голос
/ 16 января 2020

Да, вы неправильно поняли функцию.

Каждый канал получает свою собственную псевдо-очередь; вы можете получать сообщения только с того же канала, поэтому контейнер общего прослушивателя не сможет его взломать.

directMessageListenerContainer.setQueueNames("amq.rabbitmq.reply-to");

Вы просто не можете этого сделать.

Фреймворк уже поддерживает прямой ответ непосредственно внутри RabbitTemplate. RabbitTemplate имеет свой собственный DirectReplyToMessageListenerContainer, который поддерживает пул каналов.

Каждый запрос проверяет канал, и ответ возвращается туда, а затем канал возвращается в пул для повторного использования другим запросом.

Использование RabbitTemplate.convertSendAndReceive(); Поведение по умолчанию (в последних версиях) будет автоматически использовать прямой ответ.

РЕДАКТИРОВАТЬ

Почему бы не позволить фреймворку выполнить всю тяжелую работу, и вы просто сосредоточитесь на ваш бизнес логи c:

@SpringBootApplication
public class So59760805Application {

    public static void main(String[] args) {
        SpringApplication.run(So59760805Application.class, args);
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory cf) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
        container.setQueueNames("foo");
        container.setMessageListener(new MessageListenerAdapter(new MyListener()));
        return container;
    }

    @Bean
    public MyExtendedTemplate template(ConnectionFactory cf) {
        return new MyExtendedTemplate(cf);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> System.out.println(template.convertSendAndReceive("", "foo", "test"));
    }

}

class MyListener {

    public String handleMessage(String in) {
        return in.toUpperCase();
    }

}

class MyExtendedTemplate extends RabbitTemplate {

    MyExtendedTemplate(ConnectionFactory cf) {
        super(cf);
    }

    @Override
    public void onMessage(Message message) {
        System.out.println("Response received (before conversion): " + message);
        super.onMessage(message);
    }

}

В шаблоне кролика по умолчанию используется прямой ответ (внутренне).

Response received (before conversion): (Body:'TEST' MessageProperties [headers={}, correlationId=1, ...receivedRoutingKey=amq.rabbitmq.reply-to.g2dkAA5yYWJiaXRAZ29sbHVtMgAAeE0AAADmAw==.RQ/uxjR79PX/hZF+7iAdWw==, ...
TEST
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...