Дублированные сообщения на ActiveMQ - PullRequest
3 голосов
/ 01 марта 2011

Я использую ActiveMQ в качестве JMS-брокера и потребителя, jmsTemplate для отправки сообщений, 1 недолговременная тема на данный момент. В пиковое время у меня ~ 100 сообщений в секунду.

Неважно, сколько сообщений в очереди, но я часто получаю дублированные сообщения. Временное решение, которое я придумала, - настроить индекс для таблицы - на данный момент все сообщения сохраняются только в базе данных.

Мой первый вопрос - почему сообщения дублируются, если я указал недолговечную тему и ответ не требуется?

Отправитель:

@Component
public class QueueSender 
{
    private Logger log = Logger.getLogger(getClass());
@Autowired
    protected JmsTemplate jmsTemplate;


    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    @Autowired
    public QueueSender( final JmsTemplate jmsTemplate )
    {
        this.jmsTemplate = jmsTemplate;
        this.jmsTemplate.setDeliveryPersistent(false);
        System.out.println("isSessionTransacted "+jmsTemplate.isSessionTransacted()+
                " getDeliveryMode "+jmsTemplate.getDeliveryMode()+
                " getReceiveTimeout "+jmsTemplate.getReceiveTimeout()+
                " getSessionAcknowledgeMode "+jmsTemplate.getSessionAcknowledgeMode());
    }


    public void sendPrice(Integer tickerId, Integer field, Double price, Long timestamp)
    {
        jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        jmsTemplate.setMessageIdEnabled(true);
        Map <String, Object>map = new HashMap<String, Object>();
        map.put("tickerId", tickerId);
        map.put("field", field);
        map.put("price", price);
        map.put("timestamp", timestamp);
        jmsTemplate.convertAndSend("Quotez", map);
    }

    public void sendVolume(Integer tickerId, Integer field, Integer size, Long timestamp)
    {
        jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        Map <String, Object>map = new HashMap<String, Object>();
        map.put("tickerId", tickerId);
        map.put("field", field);
        map.put("size", size);
        map.put("timestamp", timestamp);
        jmsTemplate.convertAndSend("Quotez", map);

    }

}

Слушатель:

public void onMessage(Message message) 
{
     if (message instanceof MapMessage) 
     {           
         try
         {
             MapMessage mapMessage = (MapMessage) message;
                 if(null !=  mapMessage.getString("price"))
                 {
 priceService.insert(mapMessage.getInt("tickerId"),mapMessage.getDouble("price"),
mapMessage.getInt("field"),mapMessage.getLong("timestamp"));
                 }                     else{
volumeService.insert(mapMessage.getInt("tickerId"),mapMessage.getInt("size"),
mapMessage.getInt("field"),mapMessage.getLong("timestamp"));
             }
         }
         catch (final JMSException e)
         {
             exceptionListener.onException(e);
         }
     }
}

Spring:

<amq:broker useJmx="true" persistent="false">
<amq:transportConnectors>
  <amq:transportConnector uri="tcp://localhost:0"/>
</amq:transportConnectors> </amq:broker>
<amq:topic id="topicDest"  physicalName="Quotez"/>
  <amq:connectionFactory id="jmsFactory" brokerURL="vm://localhost?jms.watchTopicAdvisories=false"/>  
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="jmsFactory" />
<property name="exceptionListener" ref="jmsExceptionListener" />
<property name="sessionCacheSize" value="100" />
</bean>


<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
   <constructor-arg ref="connectionFactory"/>
    <property name="pubSubDomain" value="true"/>
 <property name="defaultDestinationName" value="Quotez"/>    
</bean>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="topicDest"/>
        <property name="messageListener" ref="jdbcListener" />
    </bean>

Второй вопрос касается конфигурации jmsContainer. В чем разница между кодом выше и кодом ниже? Код выше дает мне тему как подписчик, а код ниже дает мне очередь.

<jms:listener-container concurrency="10" connection-factory="connectionFactory">    
<jms:listener id="JdbcListener" destination="topicDest" ref="queueListener" />  
</jms:listener-container>

Я обнаружил, что Camel и его idempotentConsumer предполагают решение проблемы дублирования - конечно, было бы неплохо узнать, почему это происходит в первую очередь. Третий вопрос касается конфигурации верблюда. У меня есть эта конфигурация (по умолчанию):

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:0"/>
</bean>

<bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>

<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
    <from uri="direct:start"/>
    <idempotentConsumer messageIdRepositoryRef="myRepo">
        <header>messageId</header>
        <to uri="mock:result"/>
    </idempotentConsumer>
</route>
</camelContext>

Это применимо ко всем очередям или я должен сделать явную подписку? Я предполагаю, что он проверит каждую тему / очередь и все входящие сообщения. Проблема на данный момент в том, что все сообщения имеют messageId = null и фильтр принимает его в качестве параметра.

2011-03-01 11:24:09,152 DEBUG (org.springframework.jms.core.JmsTemplate:567) - Sending created message: ActiveMQMapMessage {commandId = 0, responseRequired = false, **messageId = null**, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false} ActiveMQMapMessage{ theTable = {field=1, timestamp=1298975049138, price=72.89, tickerId=2} }

Я не нашел простой способ установить messageId. Мой вопрос - достаточно ли установить messageId, и он будет работать как исключение, или что-то не так с конфигурацией, например, я должен указать, какая тема будет использоваться.

Спасибо,

Dzidas

1 Ответ

4 голосов
/ 08 июля 2011

при использовании темы JMS необходимо установить для одновременных / максимальных одновременных потребителей значение «1», иначе вы получите дубликаты. если вам нужно многопоточное потребление и / или балансировка нагрузки, используйте взамен виртуальные темы .

...