Как объединить два сообщения в один маршрут? - PullRequest
0 голосов
/ 24 мая 2019

У меня есть один маршрут, который собирает заголовки и отправляет запрос в другую очередь, из которой я жду ответа (мне нужно только тело ответа).Проблема в том, что и заголовки, и ответ, который я хочу получить в одном сообщении по одному маршруту, но теперь я получаю два сообщения по одному маршруту (мне нужно объединить тело ответа и заголовки).как это сделать?

from("jms:queue:aaa")
    .log("incoming message")
    .process(exchange -> {
        exchange.getIn().setHeader("c", c);
        exchange.getIn().setHeader("d", d);
        exchange.getIn().setHeader("a", a);
        exchange.getIn().setHeader("b", b);
        exchange.getIn().setBody("2+3");
    })
    .removeHeaders("*", "a", "b", "c", "d")
    .setHeader("JMSReplyTo", simple("bbb"))
        //send request
    .to(ExchangePattern.InOnly, "jms:ccc?preserveMessageQos=true&includeSentJMSMessageID=true")
        //trying to send headers from this route to bbb
    .to(ExchangePattern.InOut, "jms:bbb")
.end();
from("jms:bbb")
    .log("${headers}\n${}body")
.end();

из ("jms: bbb") это маршрут, по которому я хочу объединить результаты из.
.to(ExchangePattern.InOnly, "jms:ccc?preserveMessageQos=true& includeSentJMSMessageID=true").
и.
.to (ExchangePattern.InOut, "jms: bbb")

UPD:

public class AggregationStrategyImpl implements AggregationStrategy {

    private static final Logger log = LoggerFactory.getLogger(AggregationStrategyImpl.class);

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if(newExchange == null)
            newExchange = oldExchange;
        String a = oldExchange.getIn().getHeader("a", String.class);
        String b = oldExchange.getIn().getHeader("b", String.class);
        String c = oldExchange.getIn().getHeader("c", String.class);
        String d = oldExchange.getIn().getHeader("d", String.class);
        newExchange.getIn().setHeader("a", a);
        newExchange.getIn().setHeader("b", b);
        newExchange.getIn().setHeader("c", c);
        newExchange.getIn().setHeader("d", d);

        return newExchange;
    }
}

 .setHeader("JMSReplyTo", simple("bbb"))
                    .multicast(aggregationStrategy)
                    .to(ExchangePattern.InOnly, "jms:ccc?preserveMessageQos=true&includeSentJMSMessageID=true")
                    .to(ExchangePattern.OutOnly, "jms:bbb")

Ответы [ 2 ]

0 голосов
/ 28 мая 2019

См. здесь .Похоже, ваш первый оператор if не работает:

if(newExchange == null) 
  newExchange = oldExchange;

Таким образом, новый входящий объект обмена всегда будет оставаться нулевым.Вместо этого все должно быть иначе, потому что вы хотите сохранить oldExchange, когда приходит newExchange:

if (oldExchange == null) {
    //...
    return newExchange;
} else {
    //...
    return oldExchange;
}

Проблема заключается в том, что вы ожидаете, пока newExchange не будетноль, но всегда - это newExchange, если вызывается агрегат.Самый первый объект обмена, который появится, тоже newExchange.Вместо этого первый вызов агрегата не имеет oldExchange!

0 голосов
/ 24 мая 2019
                .log("incoming message")
                .process(exchange -> {
                    exchange.getIn().setHeader("c", c);
                    exchange.getIn().setHeader("d", d);
                    exchange.getIn().setHeader("a", a);
                    exchange.getIn().setHeader("b", b);
                    exchange.getIn().setBody("2+3");
                })
                .removeHeaders("*", "a", "b", "c", "d")
                .setHeader("JMSReplyTo", simple("bbb"))
                //send request
                .multicast(aggregateStratergeyBean)
                .to(ExchangePattern.InOnly, "jms:ccc?preserveMessageQos=true&includeSentJMSMessageID=true")
                //trying to send headers from this route to bbb
                .to(ExchangePattern.InOut, "jms:bbb")
                .end();
        from("jms:bbb")
                .log("${headers}\n${}body")
                .end();```

aggregateStratergeyBean should implements AggregationStrategy. 

Also you can use many options like below

.parallelProcessing(true)   // Parallel process both routes

.parallelAggregate()  // Aggregate parallel , you need to handle thread safety

.streaming(). // Process response as and when it get the response not in the order it invoked.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...