Activemq, как настроить потребительский слушатель (на Java) - PullRequest
4 голосов
/ 29 января 2012

Я был бы рад, если бы кто-нибудь поделился простым примером конфигурации XML-приемника для приемника на основе XML в Spring.Заранее спасибо.

EDIT;

Я просто хотел бы услышать о слушателе потребителя, а не о реализации потребителя, потому что я уже внедрил активную-mq в моем приложении.и он работает хорошо, однако я не могу быть уверен в порядке потребления элементов, которые отправляются производителем синхронно.Проблема заключается в несогласованности и манипулировании данными из-за асинхронного выполнения метода (сохранение некоторых объектов в БД для их регистрации) от одновременных потребителей одновременно.

РЕДАКТИРОВАТЬ 2: Позвольте мне уточнить эту сложность.У меня есть приложение, которое состоит из двух базовых отдельных частей.Первый - это синхронно исполняющийся Producer, который запрашивает у db информацию о новых продуктах, а затем «один за другим» отправляет им метод jmsTemplate.send, предоставленный active-mq.Это была операция, которая синхронно выполняется из Cron / Timer.Другими словами, продюсер выполняется из таймера / хрон.Теперь проблема заключается в самом потребителе.Когда производитель отправляет продукты «один за другим», асинхронные потребители (с включенным параллелизмом) получают продукты и потребляют их асинхронно.

Проблема начинается здесь.Потому что метод, который выполняется от потребителя, когда продукты только что получены, выполняет некоторые операции по сохранению БД.Когда один и тот же продукт принимается отдельными параллельными потребителями (это происходит из-за нашей системы, а не из-за проблемы jms, не обращайте внимания на этот момент), то при выполнении одних и тех же операций персистентности с одним и тем же объектом возникают некоторые исключения, которые легко предсказать.Как я могу предотвратить асинхронные операции с продуктами или управлять заказами потребляемых продуктов в таких приложениях.

Спасибо.

1 Ответ

7 голосов
/ 29 января 2012
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:jms="http://www.springframework.org/schema/jms"
   xmlns:p="http://www.springframework.org/schema/p"
   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                       http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

<!-- A simple and usual connection to activeMQ -->  
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"></property>
</bean>                

 <!-- A POJO that implements the JMS message listener -->
<bean id="simpleMessageListener" class="MyJMSMessageListener" />


<!-- Cached Connection Factory to wrap the ActiveMQ connetion -->
<bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">     
    <property name="targetConnectionFactory" ref="activeMQConnectionFactory"></property>
    <property name="sessionCacheSize" value="10"></property>
    <property name="reconnectOnException" value="true"></property>
</bean>


<!-- The Spring message listener container configuration -->
<jms:listener-container container-type="default" connection-factory="cachedConnectionFactory" acknowledge="auto">
    <jms:listener destination="FOO.TEST" ref="simpleMessageListener" method="onMessage" />
</jms:listener-container>

</beans>

И класс Java, который слушает сообщение itslef:

import javax.jms.Message;
import javax.jms.MessageListener;
public class MyJMSMessageListener implements MessageListener{

    @Override
    public void onMessage(Message message) {
    // Do your work here
    }

}

Для запуска этого прослушивателя нужно получить контекст приложения, он автоматически запустит прослушиватель JMS, как только вы это сделаете.

РЕДАКТИРОВАТЬ в соответствии с другим вопросом :

Таким образом, ваша система может генерировать (например) 2 или даже больше сообщений, доставляемых потребителям с одним и тем же productID? Ну, во-первых, это скорее не ВАША проблема, а проблема приложения. Даже если вы как-то это исправите, это не совсем исправление, но это способ скрыть саму проблему. Тем не менее, если я вынужден предоставить решение, прямо сейчас, я могу подумать только об одном, самом простом, отключении одновременного потребления, вроде. Вот что я хотел бы сделать: получать сообщения в очереди и иметь только одного получателя в этой очереди. Внутри этого потребителя я бы обработал как можно меньше сообщений - взял ТОЛЬКО productID и поместил его в другую очередь. Перед этим вы должны всегда проверять, не находится ли productID в этой очереди. Если он просто возвращается без вывода сообщений, если это не так, это означает, что он никогда не обрабатывался, поэтому поместите это сообщение в другую Очередь: например, в Очередь2, а затем включите одновременных потребителей во второй Очереди Очереди2. Это все еще имеет недостатки: сначала очередь productID нужно как-то очищать время от времени, иначе она будет расти вечно, но это очень сложно. Сложная часть: что, если у вас есть productID в очереди productID, но продукт пришел для ОБНОВЛЕНИЯ в БД, а не INSERT? Вы не должны отклонять это тогда ...

...