Вопрос
Как настроить ActiveMQ
и <flow>
в Mule ESB 3.2
, чтобы убедиться, что сообщение, извлеченное из очереди, правильно обрабатывается внешним CXF service
?
Сценарий
У меня есть конечная точка CXF, которая должна принять входящее сообщение и как можно скорее передать его трем внешним службам.Давайте назовем их EX1, EX2, EX3.Это довольно просто, благодаря компоненту <all>
, представленному в Mule 3.x.
Самое важное требование всего решения - убедиться, что каждое полученное сообщение заканчиваетсядоставляется всем трем сервисам CXF.Таким образом, мы закончили с идеей помещать каждое входящее сообщение в Persistent JMS queues
(Q1, Q2, Q3).После прочтения сообщения из очереди Qn оно передается непосредственно в соответствующую конечную точку EXn и, следовательно, на внешнюю службу.
Config
(я могу предоставить полную конфигурацию по запросу)
Мы настроили брокер ActiveMQ, как описано здесь , и подключили его к нашей конфигурации <flow>
.Кажется, все работает, как и ожидалось, у меня есть JConsole, подключенная к моему приложению, так что я вижу, что сообщения имеют тип PERSISTENT и они попадают в правильные очереди.Если все идет гладко - сообщения принимаются всеми тремя службами EXn.
Тесты
Проблема возникает, когда мы отключаем одну из служб, скажем, EX2, иперезагрузите весь сервер, имитируя сбой. Сообщение в конечном итоге теряется (я полагаю, оно не такое постоянное, а?).Самое любопытное, что - если мы отправили 10 сообщений, когда EX2 не работает, после перезапуска сервера 9 из них будут правильно доставлены!Поэтому я думаю, что, может быть, просто возможно, 9 из этих 10 сообщений были должным образом помещены в очередь, в то время как одно постоянно доставлялось при сбое сервера.
Это заставляет меня думать, что конечная точка CXF не являетсябыть обращенным с поддержкой транзакций, что я не могу понять, честно говоря.В конце концов, я вижу, что сообщение находится в очереди, когда оно пытается быть доставлено, поэтому его следует сохранить.Это явно не так, но почему?
Мои собственные попытки Я пробовал несколько вещей, ни одна из которых не сработала.Всегда теряется одно сообщение.
- Не использовать теги
<jms:transaction />
в потоках - не работает - Запуск транзакции jms при получении сообщения, присоединение при отправке на
<cxf:jaxws-client />
- Использование XA с JBoss и
<xa-transaction />
- не сработало - Предоставление
<default-exception-strategy>
конфигурации - если я вспомню, что все стало хуже
Любая помощьприветствуется, спасибо.
CONFIG
КОНФИГУРАЦИЯ ACTIVE MQ
<spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
<spring:property name="queue" value="queue.*"/>
<spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
</spring:bean>
<spring:bean id="AmqPolicyMap" class="org.apache.activemq.broker.region.policy.PolicyMap">
<spring:property name="defaultEntry" ref="AmqDefaultPolicyEntry"/>
</spring:bean>
<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
<spring:property name="brokerURL" value="vm://localhost?jms.prefetchPolicy.all=1&broker.persistent=true&broker.useJmx=true"/>
<spring:property name="redeliveryPolicy">
<spring:bean class="org.apache.activemq.RedeliveryPolicy">
<spring:property name="initialRedeliveryDelay" value="${props.initialRedeliveryDelay}"/>
<spring:property name="redeliveryDelay" value="${props.redeliveryDelay}"/>
<spring:property name="maximumRedeliveries" value="${props.maximumRedeliveries}"/>
<spring:property name="backOffMultiplier" value="${props.backOffMultiplier}"/>
</spring:bean>
</spring:property>
</spring:bean>
<spring:bean name="persistenceAdapter" class="org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter">
<spring:property name="directory" value="/home/bachman/activemq"/>
</spring:bean>
<spring:bean name="AmqBroker"
class="org.apache.activemq.broker.BrokerService"
init-method="start"
destroy-method="stop">
<spring:property name="brokerName" value="esb-amq-broker"/>
<spring:property name="persistent" value="true"/>
<spring:property name="dataDirectory" value="/home/bachman/activemq"/>
<spring:property name="useJmx" value="true"/>
<spring:property name="useShutdownHook" value="false"/>
<spring:property name="persistenceAdapter" ref="persistenceAdapter"/>
<spring:property name="destinationPolicy" ref="AmqPolicyMap"/>
</spring:bean>
<jms:activemq-connector name="PersistentJMSConnector" specification="1.1"
numberOfConsumers="1" maxRedelivery="-1" persistentDelivery="true"
connectionFactory-ref="connectionFactory" acknowledgementMode="CLIENT_ACKNOWLEDGE"
disableTemporaryReplyToDestinations="true"/>
FLOW - отправлять входящее сообщениедо 3 очередей Qn
<flow name="dispatch-to-queues">
<inbound-endpoint ref="incoming-cxf"/>
<!-- Each received message ends up to be sent to all destinations -->
<all>
<jms:outbound-endpoint name="queue.q1"
queue="queue.q1" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on Q1"
connector-ref="PersistentJMSConnector"/>
<jms:outbound-endpoint name="queue.q2"
queue="queue.q2" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on q2"
connector-ref="PersistentJMSConnector" />
<jms:outbound-endpoint name="queue.q3"
queue="queue.q3" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on Q3"
connector-ref="PersistentJMSConnector" />
</all>
<custom-processor class="com.mycompany.just.a.component.to.return.OK.via.Cxf" />
</flow>
FLOW - обрабатывать доставку от Qn до EXn
<flow name="from-q1-to-ex1">
<jms:inbound-endpoint queue="queue.q1" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Pull from q1."
connector-ref="PersistentJMSConnector">
<jms:transaction action="ALWAYS_BEGIN" />
</jms:inbound-endpoint>
<logger message="Sending message to EX-1" level="INFO" />
<!-- Handle errors at this point in flow
<custom-processor class="pl.exception.lookup.Component">
<spring:property name="targetModuleName" value="Not-important"/>
</custom-processor>
-->
<outbound-endpoint ref="ex1-cxf-endpoint">
<jms:transaction action="ALWAYS_JOIN" timeout="360000"/>
</outbound-endpoint>
</flow>
ENDPOINTS - объявление упомянутых конечных точек
<endpoint name="incoming-cxf" address="http://incoming.mycompany.com/in" exchange-pattern="request-response">
<cxf:jaxws-service serviceClass="com.mycompany.services.InService"/>
</endpoint>
<endpoint name="ex1-cxf-endpoint" address="http://com.mycompany.ex1" exchange-pattern="request-response">
<cxf:jaxws-client
clientClass="com.mycompany.services.Ex1"
wsdlLocation="classpath:wsdl/ex1.wsdl"
operation="someOperation"
port="SomePort"/>
</endpoint>