Как включить транзакцию с активным верблюжьим компонентом - PullRequest
0 голосов
/ 16 мая 2018

Во время использования сообщения из очереди JMS я хочу решить, является ли процесс потребления успешным или неудачным. В случае сбоя я хочу, чтобы activemq повторно доставил сообщение через некоторое время, например, через 1 минуту.Позже, основываясь на количестве доставок, я могу отправить его в DLQ.

Я нашел примеры для достижения того же с помощью Java-кода, например сеанс с другим режимом подтверждения или выполнение сеанса, но не смог понять, как этого добиться с файлом чертежа..

     <reference id="testIdempotencyStore"
           interface="javax.sql.DataSource"
           filter="(osgi.jndi.service.name=TestContext)">
</reference>

<bean id="simulatorMessages" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
    <argument ref="testIdempotencyStore" />
    <argument value="jmsTest" />
</bean>

<reference id="txMgr" interface="javax.transaction.TransactionManager" />

<bean id="xaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
    <property name="brokerURL" value="${activemq.url}" />
    <property name="watchTopicAdvisories" value="false" />
    <property name="userName" value="${activemq.user}" />
    <property name="password" value="${activemq.password}" />
    <property name="redeliveryPolicy" >
        <bean class="org.apache.activemq.RedeliveryPolicy" >
            <property name="maximumRedeliveries" value="0" />
        </bean>
    </property>
</bean>

<bean id="jcaConnectionFactory" class="org.apache.activemq.jms.pool.JcaPooledConnectionFactory"
      init-method="start" destroy-method="stop">
    <property name="transactionManager" ref="txMgr"/>
    <property name="maxConnections" value="10" />
    <property name="name" value="amq" />
    <property name="connectionFactory" ref="xaConnectionFactory"/>
</bean>

<bean id="jmsTxConf" class="org.apache.activemq.camel.component.ActiveMQConfiguration">
    <property name="connectionFactory" ref="jcaConnectionFactory" />
    <property name="requestTimeout" value="10000" />
    <property name="transactionTimeout" value="30" />
    <property name="cacheLevelName" value="CACHE_NONE" />
</bean>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >
    <property name="configuration" ref="jmsTxConf" />
</bean>

<camelContext id="TestContext-jms-dispatcher" trace="false" xmlns="http://camel.apache.org/schema/blueprint" >
    <route id="externalNotificationsDispatchRoute" >
        <from uri="activemq:queue:{{vqueue.name}}" />
        <idempotentConsumer messageIdRepositoryRef="simulatorMessages">
            <header>customId</header>
            <to uri="vm:notificationConsumer" />
        </idempotentConsumer>
    </route>
</camelContext>

1 Ответ

0 голосов
/ 16 мая 2018

Если вы этого еще не сделали, прочитайте эту главу , чтобы узнать больше о том, как транзакции JMS XA работают в JBoss Fuse.

Вам не хватает Spring PlatformTransactionManagerи несколько других вещей.Вот пример конфигурации, которая должна работать:

<!-- Service reference to Spring’s PlatformTransactionManager -->
<reference id=”osgiPlatformTransactionManager”
    interface=”org.springframework.transaction.PlatformTransactionManager”>

<!-- Service reference to JTA TransactionManager -->
<reference id=”osgiJtaTransactionManager”
    interface=”javax.transaction.TransactionManager”>

<!-- JMS TX endpoint -->
<bean id=”activemq” class=”org.apache.activemq.camel.component.ActiveMQComponent”>
    <property name=”configuration” ref=”jmsTxConfig”>
</bean>

<bean id=”jmsTxConfig” class=”org.apache.camel.component.jms.JmsConfiguration”>
    <property name=”connectionFactory” ref=”jmsXaPoolConnectionFactory” />
    <property name=”transactionManager” ref=”osgiPlatformTransactionManager” />
    <!-- transacted must be set to “false” for XA transactions -->
    <!-- as it is only meant to be used with local transactions -->
    <property name=”transacted” value=”false” />
</bean>

<!-- The connection factory wrapper to enable auto-enlisting -->
<bean id=”jmsXaPoolConnectionFactory”
    class=”org.apache.activemq.pool.JcaPooledConnectionFactory”>
    <property name=”name” value=”MyXaResource” />
    <property name=”maxConnections” value=”1” />
    <property name=”connectionFactory” ref=”jmsXaConnectionFactory” />
    <property name=”transactionManager” ref=”osgiJtaTransactionManager” />
</bean>

<bean id=”jmsXaConnectionFactory”
    class=”org.apache.activemq.ActiveMQXAConnectionFactory”>
    <property name=”brokerURL” value=”${activemq.url}” />
    <property name=”userName” value=”${activemq.user}” />
    <property name=”password” value=”${activemq.password}” />
    <property name="watchTopicAdvisories" value="false" />
    <property name="redeliveryPolicy" >
    <bean class="org.apache.activemq.RedeliveryPolicy" >
        <property name="maximumRedeliveries" value="0" />
    </bean>
</property>
</bean>

<!-- ActiveMQ XA Resource Manager -->
<bean id=”resourceManager” class=”org.apache.activemq.pool.ActiveMQResourceManager”
    init-method=”recoverResource”>
    <property name=”transactionManager” ref=”osgiJtaTransactionManager” />
    <property name=”connectionFactory” ref=”jmsXaConnectionFactory” />
    <property name=”resourceName” value=”activemq.default” />
</bean>

<!-- Transaction Policy to be referenced by the transacted-component in a route -->
<bean id=”txPolicy” class=”org.apache.camel.spring.spi.SpringTransactionPolicy”>
    <property name=”transactionManager” ref=”osgiPlatformTransactionManager” />
    <!-- PROPAGATION_REQUIRED is the default value if a behavior is not set -->
    <property name=”propagationBehaviorName” value=”PROPAGATION_REQUIRED” />
</bean>
...