Apache Camel AggregationStrategy.aggregate получает список вместо отдельных объектов - PullRequest
0 голосов
/ 16 мая 2019

У меня ниже верблюжий маршрут.

    from("{{trade-publisher.trade-tnc.source-endpoint}}")
        .doTry()
            .bean(clientApi, "search(${body},${header.region})")  //Returns List<Trade> 
            .split(simple("${body}"))
                .parallelProcessing()
                    .doTry()
                        .bean(clientApi,"enrich(${body})") //Passing Trade Object
                    .endDoTry()
                    .doCatch(Exception.class)
                        .log(LoggingLevel.ERROR, "ENRICHMENT-EXCEPTION : ${exception.stacktrace}")  
                    .end()  //End of Inner try catch
            .end()// End of split() and parallelProcessing()
            .aggregate(aggegrationStrategy)
            .exchange()
            .completionTimeout(30000L)
    ...
    ...

Но в моем приведенном ниже агрегате я получаю Список в newExchange? Разве он не должен передавать Trade объект вместо List<Trade>?

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

Ответы [ 2 ]

2 голосов
/ 16 мая 2019

Нет, это по проекту , вы получите список после сплиттера. См. Документацию EIP Splitter и найдите параграф What the Splitter returns.

Начиная с версии Camel 2.3, возвращается исходное сообщение , которое было введено для Splitter. Если вы хотите что-то сделать с отдельными частями, вы должны сделать это внутри Splitter .

Ну, вы уже делаете это в

.bean(clientApi,"enrich(${body})")

Когда вы «закрываете» Splitter, вы можете продолжать работать с исходной полезной нагрузкой до Splitter. Это может быть очень удобно, но если вам это не нужно, маршрут обычно заканчивается после Splitter.

0 голосов
/ 20 мая 2019

По некоторым предположениям, ваша агрегация находится не в той точке маршрута, если вы хотите, чтобы агрегатор вызывался для каждого обмена, создаваемого разделителем.

from("{{trade-publisher.trade-tnc.source-endpoint}}")
    .doTry()
        .bean(clientApi, "search(${body},${header.region})")  //Returns List<Trade> 
        .split(simple("${body}"))
            .parallelProcessing()
            .aggregate(aggegrationStrategy)   <-- MOVE AGGREGATOR INSIDE SPLIT
            .completionTimeout(30000L)
                .doTry()
                    .bean(clientApi,"enrich(${body})") //Passing Trade Object
                .endDoTry()
                .doCatch(Exception.class)
                    .log(LoggingLevel.ERROR, "ENRICHMENT-EXCEPTION : ${exception.stacktrace}")  
                .end()  //End of Inner try catch
        .end()// End of split() and parallelProcessing()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...