ActiveMQ: правильная конфигурация как с очередями (с одновременными потребителями), так и с темами - PullRequest
2 голосов
/ 27 марта 2012

У нас есть конфигурация ActiveMQ / Camel, которая ранее использовала исключительно очереди сообщений с одновременными потребителями.

Однако теперь мы представляем темы сообщений и выясняем, что из-за одновременных потребителей - сообщенияполученные в теме потребляются несколько раз.

Какая правильная конфигурация для этого сценария?

то есть, мы хотим, чтобы несколько одновременных потребителей для сообщений, полученных в очереди, но был определен только один потребительдля сообщений, полученных по теме.

Вот текущая конфигурация:

<amq:connectionFactory id="amqConnectionFactory"
    useAsyncSend="true" brokerURL="${${ptl.Servername}.jms.cluster.uri}"
    userName="${jms.username}" password="${jms.password}" sendTimeout="1000"
    optimizeAcknowledge="true" disableTimeStampsByDefault="true">
</amq:connectionFactory>

<bean id="cachingConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
    <property name="cacheConsumers" value="true"></property>
    <property name="cacheProducers" value="true"></property>
    <property name="reconnectOnException" value="true"></property>
    <property name="sessionCacheSize" value="${jms.sessioncachesize}"></property>
</bean>

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="transacted" value="false" />
    <property name="concurrentConsumers" value="${jms.concurrentConsumer}" />
    <property name="maxConcurrentConsumers" value="${jms.max.concurrentConsumer}" />
    <property name="preserveMessageQos" value="true" />
    <property name="timeToLive" value="${jms.timeToLive}" />
</bean>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig" />
</bean>

Ответы [ 2 ]

3 голосов
/ 27 марта 2012

вы можете явно установить concurrentConsumers / maxConcurrentConsumers на «1» для любых потребителей тем.

from("activemq:topic:myTopic?concurrentConsumers=1&maxConcurrentConsumers=1")...  

альтернативно, установите для свойств JmsConfiguration concurrent / maxConcurrentConsumers значение «1», а затем явно разрешите одновременное потребление для очередей как

from("activemq:queue:myQueue?maxConcurrentConsumers=5")...  

также вы можете использовать Виртуальные темы для одновременного использования сообщений темы без дублирования (настоятельно рекомендуется по сравнению с традиционными темами)

0 голосов
/ 27 марта 2012

В итоге я решил создать отдельный блок конфигурации jmsConfig / activeMQ.

Общая конфигурация выглядит следующим образом:

 <!-- This is appropriate for consuming Queues, but not topics.  For topics, use
jmsTopicConfig / activemqTopics -->
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="transacted" value="false" />
    <property name="concurrentConsumers" value="${jms.concurrentConsumer}" />
    <property name="maxConcurrentConsumers" value="${jms.max.concurrentConsumer}" />
    <property name="preserveMessageQos" value="true" />
    <property name="timeToLive" value="${jms.timeToLive}" />
</bean>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig" />
</bean>

<!-- This config limits to a single concurrent consumer.  This config is appropriate for
consuming Topics, not Queues. -->
<bean id="jmsTopicConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="transacted" value="false" />
    <property name="concurrentConsumers" value="1" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="preserveMessageQos" value="true" />
    <property name="timeToLive" value="${jms.timeToLive}" />
</bean>

<bean id="activemqTopics" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsTopicConfig" />
</bean>

Затем в верблюжьем конвейере, потребляющемтема от bean-компонента activemqTopics выглядит следующим образом:

<camel:route id="myTopicResponder">
    <camel:from uri="activemqTopics:topic:stockQuotes?concurrentConsumers=1" />
    <camel:to uri="bean:stockQuoteResponder?method=saveStockQuote"/>
</camel:route>
...