Контекст Я создаю проект EI WSO2, в котором записываются входящие сообщения {"dest": "some_E_mail", "msg": "some message"}
в очереди сообщений для последующего чтения последовательностями того же проекта EI. Вот общая форма линии:
|EI API sequence| --piling--> |ActiveMQ| --unpiling--> |EI sequence to call external service|
Унитарные сообщения отправляются в очереди сообщений со следующим посредником
<send>
<endpoint>
<address uri="jms:/JMSSmsPile?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://localhost:61616&transport.jms.DestinationType=queue">
</address>
</endpoint>
</send>
и читаются из MQ со следующим inboundEndpoint :
<inboundEndpoint name="JMSSmsPile" onError="UnpileSmsError" protocol="jms" sequence="UnpileSmsSequence" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
<parameters>
<parameter name="interval">5000</parameter>
<parameter name="sequential">true</parameter>
<parameter name="coordination">true</parameter>
<parameter name="transport.jms.Destination">JMSSmsPile</parameter>
<parameter name="transport.jms.CacheLevel">3</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
<parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
<parameter name="transport.jms.SessionAcknowledgement">AUTO_ACKNOWLEDGE</parameter>
<parameter name="transport.jms.SessionTransacted">false</parameter>
<parameter name="transport.jms.ConnectionFactoryType">queue</parameter>
<parameter name="transport.jms.ContentType">application/json</parameter>
<parameter name="transport.jms.SharedSubscription">false</parameter>
<parameter name="transport.jms.ResetConnectionOnPollingSuspension">false</parameter>
</parameters>
</inboundEndpoint>
работает как брелок для однократной отправки.
Проблема: агрегирование сообщений. Где это сделать?
Для одной из последовательностей " unpiling " мне нужно пакетных сообщений до 20 сообщений перед вызовом внешняя служба , но ни одно сообщение не должно ждать более 5 секунд для достижения этого максимума (т. е. если сообщений больше, чем 3 текущих сообщения, просто отправьте их). Я не уверен, как этого добиться.
Может ли это объединение быть реализовано с помощью ActiveMQ с предыдущими правилами? В этом контексте вместо json
MQ будет отправлять list<json>
каждые 5 секунд, а моей последовательности unpiling в конечном итоге просто потребуется разделить этот список на сегмент <20
для вызова внешней службы. Кажется возможным с использованием AsyncSend , но я не знаю, как / если я могу создать этот тип сообщения непосредственно из последовательностей WSO2.
Я не обязательно ищу ответ, ориентированный исключительно на WSO2. Все общие java / jms-входы по теме приветствуются.
Environment :
- activeMQ 5.15
- WSO2EI 6.5.0