Как разделить агрегированный обмен GroupedExchangeAggregationStrategy на исходные биржи? - PullRequest
0 голосов
/ 09 апреля 2019

После агрегации обменов с использованием GroupedExchangeAggregationStrategy мне нужно разделить их обратно (разделить отдельные метрики времени обработки) на исходные обмены.

Я попытался выполнить разбиение с помощью следующего, но полученный разделенный обмен оборачивает исходный обмен и помещает его в тело Message.

Можно ли разделить совокупный обмен GroupedExchangeAggregationStrategy на исходные обмены без обмена оболочкой? Мне нужно использовать оригинальные свойства exchange и я хотел бы сделать это с выражением SpEL.

.aggregate(constant(true), myGroupedExchangeAggregationStrategy)
    .completionInterval(1000)
    .completeAllOnStop()
    .process { /* do stuff */ }
.split(exchangeProperty(Exchange.GROUPED_EXCHANGE))
    .to(/* micrometer timer metric using SpEL expression */)
    // ^- the resulting split exchange is wrapped in another exchange

В случае, если это в настоящее время не поддерживается, я пытаюсь найти лучший способ реализовать это поведение самостоятельно, не создавая пользовательский процессор Splitter для этой единственной функции. Я надеялся каким-то образом переопределить SplitterIterable, который выполняет упаковку, но это не представляется возможным.

Ответы [ 2 ]

1 голос
/ 10 апреля 2019

Да, GroupedExchangeAggregationStrategy ничего не делает, кроме как создает java.util.List из всех Обменов .С другой стороны, Splitter EIP разделяет список по элементам и помещает элемент в тело сообщения.Поэтому в итоге вы получите Exchange, который содержит Exchange в своем теле.

Вам нужна AggregationStrategy, которая собирает все объекты тела в списке вместо всех обменов.

Вы можете попытаться использовать Camels FlexibleAggregationStrategy, который настраивается через свободный API .

new FlexibleAggregationStrategy () .storeInBody () .accumulateInCollection (ArrayList.class) .pick (new SimpleExpression ("$ {body}"));

Это должно создать AggregationStrategy, которая извлекает тело каждого сообщения (возможно, вы можете опустить метод выбора, так как извлечение тела является выбором по умолчанию), собирает их в список и сохраняет агрегацию в теле сообщения.

Чтобы снова разбить эту совокупность, достаточно простого split(body()).

РЕДАКТИРОВАТЬ из-за комментария

Да, вы правы, побочным эффектом моего решения является то, что вы теряете свойства и заголовки исходных сообщений, поскольку оно объединяет толькотел сообщений.

То, что вы хотите сделать, это разделить Список бирж обратно на оригиналы.то есть Splitter не должен создавать новые биржи, а должен использовать уже существующие и выбрасывать агрегированную оболочку Exchange.

Насколько я вижу в исходном коде Splitter , это в настоящее время невозможно :

Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
...
if (part instanceof Message) {
    newExchange.setIn((Message) part);
} else {
    Message in = newExchange.getIn();
    in.setBody(part);
}
0 голосов
/ 12 апреля 2019

Согласно принятому ответу, он не поддерживается изначально.

Этот пользовательский процессор развернет разделенный обмен (т. Е. Копирует вложенный обмен Message и свойства вкорневой обмен).Развернутый обмен будет почти идентичен оригиналу - он сохранит все не конфликтующие свойства из корневого обмена (например, Splitter связанные свойства, такие как разделенный индекс и т. Д.)

class ExchangeUnwrapper : Processor {

    override fun process(exchange: Exchange) {
        val wrappedExchange = exchange.`in`.body as Exchange
        ExchangeHelper.copyResultsPreservePattern(exchange, wrappedExchange)
    }
}

// Route.kt

from(...)
.aggregate(...)
.process { /* do things with aggregate */ }
.split(exchangeProperty(Exchange.GROUPED_EXCHANGE))
  .process(ExchangeUnwrapper())
  .process { /* do something with the original exchange */ }
.end()
...