Можно ли добавить очередь replyTo в компонент rabbitmq верблюда? - PullRequest
0 голосов
/ 22 мая 2019

Я хочу реализовать шаблон запроса-ответа на Apache Camel через RabbitMQ. Для этого я создал одну очередь (myQueue), куда я отправляю несколько сообщений и жду ответа в другую очередь (repQueue). Для этого я создал маршрут, подобный следующему.

from("direct:reqReply")
    .setHeader("rabbitmq.REPLY_TO", simple("repQueue"))
    .to("rabbitmq://localhost:5672/ex4?queue=myQueue&autoDelete=false");

И для обслуживания этих сообщений у меня есть другой маршрут в другом приложении, которое использует сообщения из myQueue и отправляет сообщения в repQueue после обработки, как показано ниже.

from("rabbitmq://localhost:5672/ex4?queue=myQueue&autoDelete=false")
    .process(new Processor() {
        public void process(Exchange ex) throws Exception {
            String val = new String((byte[]) ex.getIn().getBody());
            val = val.toUpperCase();
            ex.getIn().setBody(val);
        }
    })
    .to("rabbitmq://localhost:5672/ex4?queue=repQueue&autoDelete=false");

Итак, пока я отправляю сообщения в myQueue, как показано ниже,

String resp = template.requestBody("direct:reqReply", "first message", String.class);

ответ не приходит и получил исключение org.apache.camel.ExchangeTimedOutException: сообщение OUT не было получено в течение: 20000 миллисекунд. Ответное сообщение с correlationID: Camel-ID-PC-51756-1558533236655-0-2 не получено по назначению: amq.gen-Wbb9ObprY_7Gq-eqac44ZQ.

Я проверил источник здесь: https://github.com/apache/camel/tree/master/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq и обнаружил следующее в классе RabbitMQEndpoint.

// camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
    private String replyToType = ReplyToType.Temporary.name();
    // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
    private String replyTo;

Я нашел следующее в классе RabbitMQProducer

if (getEndpoint().getReplyTo() != null) {
    // specifying reply queues is not currently supported
    throw new IllegalArgumentException("Specifying replyTo " + getEndpoint().getReplyTo() + " is currently not supported.");
 } else {
  replyManager = createReplyManager();
  log.debug("Using RabbitMQReplyManager: {} to process replies from temporary queue", replyManager);
 }

Итак, возможно ли добавить очередь replyTo (не временную очередь) в компонент rabbitmq?

...