Я хочу реализовать шаблон запроса-ответа на Apache Camel через RabbitMQ. Для этого я создал одну очередь (myQueue), куда я отправляю несколько сообщений и жду ответа в другую очередь (repQueue).
Для этого я создал маршрут, подобный следующему.
from("direct:reqReply")
.setHeader("rabbitmq.REPLY_TO", simple("repQueue"))
.to("rabbitmq://localhost:5672/ex4?queue=myQueue&autoDelete=false");
И для обслуживания этих сообщений у меня есть другой маршрут в другом приложении, которое использует сообщения из myQueue и отправляет сообщения в repQueue после обработки, как показано ниже.
from("rabbitmq://localhost:5672/ex4?queue=myQueue&autoDelete=false")
.process(new Processor() {
public void process(Exchange ex) throws Exception {
String val = new String((byte[]) ex.getIn().getBody());
val = val.toUpperCase();
ex.getIn().setBody(val);
}
})
.to("rabbitmq://localhost:5672/ex4?queue=repQueue&autoDelete=false");
Итак, пока я отправляю сообщения в myQueue, как показано ниже,
String resp = template.requestBody("direct:reqReply", "first message", String.class);
ответ не приходит и получил исключение org.apache.camel.ExchangeTimedOutException: сообщение OUT не было получено в течение: 20000 миллисекунд. Ответное сообщение с correlationID: Camel-ID-PC-51756-1558533236655-0-2 не получено по назначению: amq.gen-Wbb9ObprY_7Gq-eqac44ZQ.
Я проверил источник здесь: https://github.com/apache/camel/tree/master/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq
и обнаружил следующее в классе RabbitMQEndpoint.
// camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
private String replyToType = ReplyToType.Temporary.name();
// camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
private String replyTo;
Я нашел следующее в классе RabbitMQProducer
if (getEndpoint().getReplyTo() != null) {
// specifying reply queues is not currently supported
throw new IllegalArgumentException("Specifying replyTo " + getEndpoint().getReplyTo() + " is currently not supported.");
} else {
replyManager = createReplyManager();
log.debug("Using RabbitMQReplyManager: {} to process replies from temporary queue", replyManager);
}
Итак, возможно ли добавить очередь replyTo (не временную очередь) в компонент rabbitmq?