Как убедиться, что сообщение из очереди JMS доставляется на внешний WebService (CXF)? - PullRequest
8 голосов
/ 24 января 2012

Вопрос

Как настроить ActiveMQ и <flow> в Mule ESB 3.2, чтобы убедиться, что сообщение, извлеченное из очереди, правильно обрабатывается внешним CXF service?

Сценарий

У меня есть конечная точка CXF, которая должна принять входящее сообщение и как можно скорее передать его трем внешним службам.Давайте назовем их EX1, EX2, EX3.Это довольно просто, благодаря компоненту <all>, представленному в Mule 3.x.

Самое важное требование всего решения - убедиться, что каждое полученное сообщение заканчиваетсядоставляется всем трем сервисам CXF.Таким образом, мы закончили с идеей помещать каждое входящее сообщение в Persistent JMS queues (Q1, Q2, Q3).После прочтения сообщения из очереди Qn оно передается непосредственно в соответствующую конечную точку EXn и, следовательно, на внешнюю службу.

Config

(я могу предоставить полную конфигурацию по запросу)

Мы настроили брокер ActiveMQ, как описано здесь , и подключили его к нашей конфигурации <flow>.Кажется, все работает, как и ожидалось, у меня есть JConsole, подключенная к моему приложению, так что я вижу, что сообщения имеют тип PERSISTENT и они попадают в правильные очереди.Если все идет гладко - сообщения принимаются всеми тремя службами EXn.

Тесты

Проблема возникает, когда мы отключаем одну из служб, скажем, EX2, иперезагрузите весь сервер, имитируя сбой. Сообщение в конечном итоге теряется (я полагаю, оно не такое постоянное, а?).Самое любопытное, что - если мы отправили 10 сообщений, когда EX2 не работает, после перезапуска сервера 9 из них будут правильно доставлены!Поэтому я думаю, что, может быть, просто возможно, 9 из этих 10 сообщений были должным образом помещены в очередь, в то время как одно постоянно доставлялось при сбое сервера.

Это заставляет меня думать, что конечная точка CXF не являетсябыть обращенным с поддержкой транзакций, что я не могу понять, честно говоря.В конце концов, я вижу, что сообщение находится в очереди, когда оно пытается быть доставлено, поэтому его следует сохранить.Это явно не так, но почему?

Мои собственные попытки Я пробовал несколько вещей, ни одна из которых не сработала.Всегда теряется одно сообщение.

  1. Не использовать теги <jms:transaction /> в потоках - не работает
  2. Запуск транзакции jms при получении сообщения, присоединение при отправке на <cxf:jaxws-client />
  3. Использование XA с JBoss и <xa-transaction /> - не сработало
  4. Предоставление <default-exception-strategy> конфигурации - если я вспомню, что все стало хуже

Любая помощьприветствуется, спасибо.

CONFIG

КОНФИГУРАЦИЯ ACTIVE MQ

<spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
    <spring:property name="queue" value="queue.*"/>
    <spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
</spring:bean>

<spring:bean id="AmqPolicyMap" class="org.apache.activemq.broker.region.policy.PolicyMap">
    <spring:property name="defaultEntry" ref="AmqDefaultPolicyEntry"/>
</spring:bean>

<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
    <spring:property name="brokerURL" value="vm://localhost?jms.prefetchPolicy.all=1&amp;broker.persistent=true&amp;broker.useJmx=true"/>
    <spring:property name="redeliveryPolicy">
        <spring:bean class="org.apache.activemq.RedeliveryPolicy">
            <spring:property name="initialRedeliveryDelay" value="${props.initialRedeliveryDelay}"/>
            <spring:property name="redeliveryDelay" value="${props.redeliveryDelay}"/>
            <spring:property name="maximumRedeliveries" value="${props.maximumRedeliveries}"/>
            <spring:property name="backOffMultiplier" value="${props.backOffMultiplier}"/>
        </spring:bean>
    </spring:property>
</spring:bean>

<spring:bean name="persistenceAdapter" class="org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter">
    <spring:property name="directory" value="/home/bachman/activemq"/>
</spring:bean>

<spring:bean name="AmqBroker"
             class="org.apache.activemq.broker.BrokerService"
             init-method="start"
             destroy-method="stop">
    <spring:property name="brokerName" value="esb-amq-broker"/>
    <spring:property name="persistent" value="true"/>
    <spring:property name="dataDirectory" value="/home/bachman/activemq"/>
    <spring:property name="useJmx" value="true"/>
    <spring:property name="useShutdownHook" value="false"/>
    <spring:property name="persistenceAdapter" ref="persistenceAdapter"/>
    <spring:property name="destinationPolicy" ref="AmqPolicyMap"/>
</spring:bean>

<jms:activemq-connector name="PersistentJMSConnector" specification="1.1"
                        numberOfConsumers="1" maxRedelivery="-1" persistentDelivery="true"
                        connectionFactory-ref="connectionFactory" acknowledgementMode="CLIENT_ACKNOWLEDGE"
                        disableTemporaryReplyToDestinations="true"/>

FLOW - отправлять входящее сообщениедо 3 очередей Qn

<flow name="dispatch-to-queues">
        <inbound-endpoint ref="incoming-cxf"/>

        <!-- Each received message ends up to be sent to all destinations -->
        <all>
            <jms:outbound-endpoint name="queue.q1"
                queue="queue.q1" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on Q1"
                    connector-ref="PersistentJMSConnector"/>

            <jms:outbound-endpoint name="queue.q2"
                queue="queue.q2" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on q2"
                connector-ref="PersistentJMSConnector" />

            <jms:outbound-endpoint name="queue.q3"
                queue="queue.q3" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on Q3"
                connector-ref="PersistentJMSConnector" />

        </all>
        <custom-processor class="com.mycompany.just.a.component.to.return.OK.via.Cxf" />
    </flow>

FLOW - обрабатывать доставку от Qn до EXn

<flow name="from-q1-to-ex1">
        <jms:inbound-endpoint queue="queue.q1" disableTransportTransformer="false"
            disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
            doc:name="JMS" doc:description="Pull from q1."
            connector-ref="PersistentJMSConnector">
                <jms:transaction action="ALWAYS_BEGIN" />
        </jms:inbound-endpoint>
        <logger message="Sending message to EX-1" level="INFO" />

        <!-- Handle errors at this point in flow
        <custom-processor class="pl.exception.lookup.Component">
            <spring:property name="targetModuleName" value="Not-important"/>
        </custom-processor>
        -->


        <outbound-endpoint ref="ex1-cxf-endpoint">
            <jms:transaction action="ALWAYS_JOIN" timeout="360000"/>
        </outbound-endpoint>
    </flow>

ENDPOINTS - объявление упомянутых конечных точек

<endpoint name="incoming-cxf" address="http://incoming.mycompany.com/in" exchange-pattern="request-response">
        <cxf:jaxws-service serviceClass="com.mycompany.services.InService"/>
    </endpoint> 

<endpoint name="ex1-cxf-endpoint" address="http://com.mycompany.ex1" exchange-pattern="request-response">
        <cxf:jaxws-client
                clientClass="com.mycompany.services.Ex1"
                wsdlLocation="classpath:wsdl/ex1.wsdl"
                operation="someOperation"
                port="SomePort"/>
    </endpoint>

Ответы [ 3 ]

5 голосов
/ 24 января 2012

Потребление JMS-сообщений в транзакции является обязательным условием, чтобы решение работало должным образом: если на исходящей фазе CXF возникает исключение, сообщение JMS в конечном итоге будет откатано, затем доставлено, инициируя новый вызов CXF.

Необходимо тщательно настроить политику повторной доставки для своего клиента ActiveMQ, чтобы повторить достаточно много раз и, возможно, не слишком быстро (например, экспоненциальный откат). Вы также хотите обрабатывать DLQ соответственно. Клиентская конфигурация ActiveMQ с Spring Beans в Mule показана: http://www.mulesoft.org/mule-activemq-integration-examples

Также обязательно обращайтесь к нужному URL-адресу брокера в своей фабрике конфигурации. С вашим именем брокера esb-amq-broker ваша фабрика конфигурации должна быть:

<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
    <spring:property name="brokerURL" value="vm://esb-amq-broker"/>
    ...
2 голосов
/ 09 февраля 2012

Не знаю, смогу ли я вам чем-нибудь помочь, но вот несколько советов по вашей проблеме:

  • если бы вы пытались использовать другой диспетчер транзакций, кроме того, который предоставляется в Jboss, я бы предложил использовать Atomikos для таких тестов
  • , как предположил Дэвид, Транзакции кажутся лучшим подходом, но другим подходом было бы использование политики явного подтверждения .... Это может быть сложно настроить, но подход, подобный перехватчику, может отслеживать соединения с некоторыми конкретными конечными точками и отправлять Подтверждение возврата к вашему JMS-серверу может быть трудным, но оно определенно обеспечит правильную доставку сообщения ...

Удачи НТН Джером

1 голос
/ 10 февраля 2012

Не уверен, поможет ли это рассмотрение, но как насчет режимов подтверждения? Возможно ли, что сообщение уже было доставлено (в режиме автоматического подтверждения), но еще не было должным образом обработано конечной точкой потребляющей службы?

Не знаю, как настроить явное подтверждение в этом сценарии, но, возможно, стоит продолжить исследование.

...