Spring Integration: Scatter-Gather, Gather не работает после параллельных вызовов веб-сервисов - PullRequest
1 голос
/ 10 июля 2019

Я настраиваю поток весенней интеграции, чтобы получать сообщения из MQ и выполнять параллельные вызовы вызовов веб-служб, создавая запросы, созданные из сообщений MQ.

Ниже показано, как выглядит поток весенней интеграции

  • Извлечение сообщения из IBM MQ, преобразование сообщения с использованием Marshaller и сохранение объекта в БД.
  • Отправка сохраненного объекта в канал Scatter-Gather.
  • Scatter-Канал сбора имеет два канала распространения, каждый канал распространения представляет собой цепочку со следующими компонентами

    1. Клиент Webservice для выполнения вызова веб-службы (Service Activator)
    2. Transformer для преобразования ответа в объект Entity Object
    3. Обработчик для сохранения данных в БД.
  • Сбор ответа от параллельных вызовов веб-службы и отправка нового объекта, созданного из двух параллельных вызовов веб-службы, в RabbitMQ.

Я могу выполнять параллельные вызовы веб-службы из шаблона рассеяния, но не вижу агрегированияn происходит в моем шаблоне сбора, в основном поток не поступает в класс gatherer.

Я пробовал канал Publish-Subscribe с Task-Executor в качестве входного канала для шаблона Scatter-Gather и, согласно журналам, вызовы веб-службы происходят параллельно сдва исполнителя задач, но он никогда не достигает собирателя после вызова веб-службы.

<si:service-activator input-channel="transformedEntity"
        ref="incidentHandler" output-channel="outputChannelFromMQ" />

<si:scatter-gather input-channel="outputChannelFromMQ" 
        requires-reply="false" output-channel="gatherResponseOutputChannel" gather-channel="gatherChannel" gather-timeout="4000">
        <si:scatterer apply-sequence="true">
            <si:recipient channel="distributionChannel1" />
            <si:recipient channel="distributionChannel2" />
        </si:scatterer>         
</si:scatter-gather>

<si:publish-subscribe-channel id="outputChannelFromMQ" apply-sequence="true" 
        task-executor="taskExecutor" />

<task:executor id="taskExecutor" queue-capacity="25"    pool-size="10-10" />

<si:chain id="planngedBagsChain" input-channel="distributionChannel1"
        output-channel="gatherChannel">
    <si:service-activator ref="webServiceClient1" method="getResponse" />
    <si:service-activator ref="serviceHandler1"     method="saveToDB" />
</si:chain>

<si:chain id="bagHistoryChain" input-channel="distributionChannel2"
    output-channel="gatherChannel">
    <si:service-activator ref="webServiceClient2" method="getResponse" />
    <si:transformer ref="transformer" />
    <si:service-activator ref="serviceHandler2" method="saveToDB" />
</si:chain>

<si:service-activator input-channel="gatherResponseOutputChannel"
    ref="responseTransformer" method="receiveResponse" output-channel="toRabbitMQ" />

1 Ответ

0 голосов
/ 11 июля 2019

После включения журналов отладки, можно узнать все каналы, на которые он отправляется. С вышеупомянутой конфигурацией это не собирается к правильному собирателю. Смена конфигурации на приведенную ниже сработала.

<si:service-activator input-channel="transformedEntity"
        ref="incidentHandler" output-channel="outputChannelFromMQ" />

<si:channel id="outputChannelFromMQ"></si:channel>

<si:scatter-gather input-channel="outputChannelFromMQ"
         requires-reply="false" scatter-channel="scatterInputChannel" output-channel="toRabbit"  
         gather-channel="gatherChannel" gather-timeout="4000">
        <si:gatherer id="responseGatherer"  ref="responseTransformer" release-strategy-expression="size() == 2"/>
    </si:scatter-gather>

<si:publish-subscribe-channel id="scatterInputChannel" apply-sequence="true" 
        task-executor="taskExecutor" />

<task:executor id="taskExecutor" queue-capacity="25"    pool-size="10-10" />

<si:chain id="planngedBagsChain" input-channel="scatterInputChannel"
        output-channel="gatherChannel">
    <si:service-activator ref="webServiceClient1" method="getResponse" />
    <si:service-activator ref="serviceHandler1"     method="saveToDB" />
</si:chain>

<si:chain id="bagHistoryChain" input-channel="scatterInputChannel"
    output-channel="gatherChannel">
    <si:service-activator ref="webServiceClient2" method="getResponse" />
    <si:transformer ref="transformer" />
    <si:service-activator ref="serviceHandler2" method="saveToDB" />
</si:chain>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...