Получить пакетные сообщения от ActiveMQ - PullRequest
0 голосов
/ 19 февраля 2020

Контекст Я создаю проект EI WSO2, в котором записываются входящие сообщения {"dest": "some_E_mail", "msg": "some message"} в очереди сообщений для последующего чтения последовательностями того же проекта EI. Вот общая форма линии:

|EI API sequence| --piling--> |ActiveMQ| --unpiling--> |EI sequence to call external service|

Унитарные сообщения отправляются в очереди сообщений со следующим посредником

<send>
  <endpoint>
      <address uri="jms:/JMSSmsPile?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://localhost:61616&amp;transport.jms.DestinationType=queue">
      </address>
  </endpoint>
</send>

и читаются из MQ со следующим inboundEndpoint :

<inboundEndpoint name="JMSSmsPile" onError="UnpileSmsError" protocol="jms" sequence="UnpileSmsSequence" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
    <parameters>
        <parameter name="interval">5000</parameter>
        <parameter name="sequential">true</parameter>
        <parameter name="coordination">true</parameter>
        <parameter name="transport.jms.Destination">JMSSmsPile</parameter>
        <parameter name="transport.jms.CacheLevel">3</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
        <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
        <parameter name="transport.jms.SessionAcknowledgement">AUTO_ACKNOWLEDGE</parameter>
        <parameter name="transport.jms.SessionTransacted">false</parameter>
        <parameter name="transport.jms.ConnectionFactoryType">queue</parameter>
        <parameter name="transport.jms.ContentType">application/json</parameter>
        <parameter name="transport.jms.SharedSubscription">false</parameter>
        <parameter name="transport.jms.ResetConnectionOnPollingSuspension">false</parameter>
    </parameters>
</inboundEndpoint>

работает как брелок для однократной отправки.

Проблема: агрегирование сообщений. Где это сделать?

Для одной из последовательностей " unpiling " мне нужно пакетных сообщений до 20 сообщений перед вызовом внешняя служба , но ни одно сообщение не должно ждать более 5 секунд для достижения этого максимума (т. е. если сообщений больше, чем 3 текущих сообщения, просто отправьте их). Я не уверен, как этого добиться.

Может ли это объединение быть реализовано с помощью ActiveMQ с предыдущими правилами? В этом контексте вместо json MQ будет отправлять list<json> каждые 5 секунд, а моей последовательности unpiling в конечном итоге просто потребуется разделить этот список на сегмент <20 для вызова внешней службы. Кажется возможным с использованием AsyncSend , но я не знаю, как / если я могу создать этот тип сообщения непосредственно из последовательностей WSO2.

Я не обязательно ищу ответ, ориентированный исключительно на WSO2. Все общие java / jms-входы по теме приветствуются.

Environment :

  • activeMQ 5.15
  • WSO2EI 6.5.0
...