Временная очередь исчезает, прежде чем я могу получить сообщение от нее в RabbitMQ с использованием JMS - PullRequest
0 голосов
/ 09 июля 2019

Я пытаюсь использовать RabbitMQ типичным способом запроса / ответа (RPC), используя JMS и временные очереди.

Я создаю сообщение с запросом, устанавливаю ответ на очередь во вновь созданную временную очередь, затем пытаюсь ответить на эту очередь, а затем пытаюсь получить ответ из этой очереди. Проблема, с которой я сталкиваюсь, заключается в том, что эта очередь исчезает, прежде чем я могу ее использовать.

Я попытался отладить, и похоже, что временная очередь создается не на этапе запроса, а на этапе ответа, поэтому время жизни этой очереди связано с соединением-ответчиком (которое я закрываю до получения ответа) вместо запрашивающее соединение. Я что-то не так делаю или это ошибка?

import static org.junit.Assert.assertEquals;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.junit.Test;

import com.rabbitmq.jms.admin.RMQConnectionFactory;

public class TmpQueueTest {

  @Test
  public void test() throws Exception
  {

    RMQConnectionFactory rmqConnectionFactory = new RMQConnectionFactory();
    rmqConnectionFactory.setHost("127.0.0.1");
    rmqConnectionFactory.setPort(5672);

    try(Connection connection = rmqConnectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE))
    {

      connection.start();
      {
          Destination replyToQ = session.createTemporaryQueue();
          Destination destinationQ = session.createQueue("dest");
          MessageProducer producer = session.createProducer(destinationQ);

          TextMessage message = session.createTextMessage("RTQ test");
          message.setJMSReplyTo(replyToQ);
          producer.send(message);

          try(Connection connection2 = rmqConnectionFactory.createConnection();
              Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE))
          {
            connection2.start();
            MessageConsumer consumer = session2.createConsumer(destinationQ);
            Message received = consumer.receive(1000);

            assertEquals(replyToQ, received.getJMSReplyTo());

            MessageProducer replierProducer = session2.createProducer(received.getJMSReplyTo());
            replierProducer.send(received);
          }

          MessageConsumer responseConsumer = session.createConsumer(replyToQ);
          TextMessage receivedResponse = (TextMessage) responseConsumer.receive(1000);

          System.out.println("Message " + message);
          System.out.println("Received message " + receivedResponse);
          assertEquals(message.getText(), receivedResponse.getText());
        }
      }

    }

}
...