Отправлять результаты агрегации по основному маршруту, а не по входным сообщениям. - PullRequest
0 голосов
/ 08 февраля 2019

Я хочу получить результат агрегации как основное сообщение маршрута, а не исходные сообщения, поступившие на IN.Я также хочу сделать это одним маршрутом.

Я знаю, что могу использовать .to("direct:one_result") после агрегации, но у меня есть серьезные ограничения на выполнение этого в одном маршруте, потому что я генерирую маршруты динамически.

Мой .to("mock:out") будет заменен более длинным определением маршрута.

    from("direct:in").routeId("TEST_AGGREGATION_ROUTE")
    .log("<IN> ${body}")
    .aggregate(header("THE_ID"), (oldExchange, newExchange) -> {
        final List<Object> body;
        final Exchange outExchange;

        if (oldExchange == null) {
            outExchange = newExchange;
            body = new ArrayList<>();
            body.add(newExchange.getIn().getBody());
        } else {
            outExchange = oldExchange;
            body = oldExchange.getIn().getBody(List.class);
            body.add(newExchange.getIn().getBody());
        }

        outExchange.getIn().setBody(body);
        return outExchange;
              })
        .completionSize(4)
        .completionTimeout(30000)

        .log("<AGGREGATION> size = ${body.size}") // HERE I GET THE AGGREGATION RESULT
    .end()
    .log("<OUT> ${body}") // HERE I GET THE INPUT MESSAGES
    .to("mock:out")
    ;

Результат теста выглядит следующим образом:

TEST_AGGREGATION_ROUTE - <IN> BODY1
TEST_AGGREGATION_ROUTE - <OUT> BODY1

TEST_AGGREGATION_ROUTE - <IN> BODY2
TEST_AGGREGATION_ROUTE - <OUT> BODY2

TEST_AGGREGATION_ROUTE - <IN> BODY3
TEST_AGGREGATION_ROUTE - <OUT> BODY3

TEST_AGGREGATION_ROUTE - <IN> BODY4
TEST_AGGREGATION_ROUTE - <AGGREGATION> size = 4
TEST_AGGREGATION_ROUTE - <OUT> BODY4

1 Ответ

0 голосов
/ 09 февраля 2019

В вашей маршрутизации есть ошибка.Не следует обрабатывать окончательные результаты агрегации «вне цикла», а в под маршруте.Не помещайте никаких операторов после вашего end ().

from("direct:in")
    ...
    .aggregate(header("THE_ID"), (oldExchange, newExchange) -> {...})
       .completionSize(4)
       .completionTimeout(30000)
       .to("direct:processAggregation")
     .end();

from("direct:processAggregation")
    .log("<AGGREGATION> size = ${body.size}") 
    .log("<OUT> ${body}");

Как только агрегат достигнет своего размера завершения, весь агрегат будет отправлен на самое первое следующее "к (...) "конечная точка.То, что вы хотите сделать с каждым агрегатом, должно быть смоделировано по отдельному маршруту.

...