Я пытаюсь использовать RabbitMQ типичным способом запроса / ответа (RPC), используя JMS и временные очереди.
Я создаю сообщение с запросом, устанавливаю ответ на очередь во вновь созданную временную очередь, затем пытаюсь ответить на эту очередь, а затем пытаюсь получить ответ из этой очереди. Проблема, с которой я сталкиваюсь, заключается в том, что эта очередь исчезает, прежде чем я могу ее использовать.
Я попытался отладить, и похоже, что временная очередь создается не на этапе запроса, а на этапе ответа, поэтому время жизни этой очереди связано с соединением-ответчиком (которое я закрываю до получения ответа) вместо запрашивающее соединение. Я что-то не так делаю или это ошибка?
import static org.junit.Assert.assertEquals;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.junit.Test;
import com.rabbitmq.jms.admin.RMQConnectionFactory;
public class TmpQueueTest {
@Test
public void test() throws Exception
{
RMQConnectionFactory rmqConnectionFactory = new RMQConnectionFactory();
rmqConnectionFactory.setHost("127.0.0.1");
rmqConnectionFactory.setPort(5672);
try(Connection connection = rmqConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE))
{
connection.start();
{
Destination replyToQ = session.createTemporaryQueue();
Destination destinationQ = session.createQueue("dest");
MessageProducer producer = session.createProducer(destinationQ);
TextMessage message = session.createTextMessage("RTQ test");
message.setJMSReplyTo(replyToQ);
producer.send(message);
try(Connection connection2 = rmqConnectionFactory.createConnection();
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE))
{
connection2.start();
MessageConsumer consumer = session2.createConsumer(destinationQ);
Message received = consumer.receive(1000);
assertEquals(replyToQ, received.getJMSReplyTo());
MessageProducer replierProducer = session2.createProducer(received.getJMSReplyTo());
replierProducer.send(received);
}
MessageConsumer responseConsumer = session.createConsumer(replyToQ);
TextMessage receivedResponse = (TextMessage) responseConsumer.receive(1000);
System.out.println("Message " + message);
System.out.println("Received message " + receivedResponse);
assertEquals(message.getText(), receivedResponse.getText());
}
}
}
}