Как читать сообщения из темы с несколькими потребителями? - PullRequest
0 голосов
/ 12 ноября 2018

Я читаю из одной темы с 10 потребителями и отправляю эти сообщения в одну очередь. Когда я посылаю 50 сообщений в тему с помощью jmeter, в очереди 500 сообщений. Итак, каждый потребитель читает одинаковые сообщения из темы и после отправки в очередь. Может ли каждый потребитель читать разные сообщения из темы?

Большое спасибо.

JmsConfig.java

@Configuration
@EnableJms
@ComponentScan(basePackages = "com.jms.config")
public class JmsConfig {
    String BROKER_URL = "tcp://localhost:61616";
    String BROKER_USERNAME = "admin";
    String BROKER_PASSWORD = "admin";

    @Bean
    public ActiveMQConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(BROKER_URL);
        connectionFactory.setPassword(BROKER_USERNAME);
        connectionFactory.setUserName(BROKER_PASSWORD);
        return connectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate(){
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory());
        template.setPubSubDomain(true);
        return template;
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrency("1-10");
        factory.setPubSubDomain(true);
        return factory;
    }

JmsSender.java

@Service
public class JmsSender{

    private JmsTemplate jmsTemplate;

    @Value("#{appProperties.toQueueName}")
    private String queueName;

    @Autowired
    private ApplicationContextUtil applicationContextUtil;

    public void send(String rawData){
        getJmsTemplate().send(queueName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                 return session.createObjectMessage(rawData);
            }
        });
    }
    public JmsTemplate getJmsTemplate(){
        if (jmsTemplate == null){
            jmsTemplate = (JmsTemplate) applicationContextUtil.getBeanFromAppContext("jmsForQueue");
        }
        return  jmsTemplate;
    }
}

Worker.java

@Component
public class Worker {

    @Autowired
    private JmsSender jmsSender;

    @JmsListener(destination = "#{appProperties.fromTopicName}")
    public String receiveMessageFromTopic(final String jsonMessage) throws JMSException {
        System.out.println("Received message " + jsonMessage);
        jmsSender.send(jsonMessage);
        return response;
    }
}

QueueConfig.xml

<?xml version="1.0" encoding="UTF-8"?>
   <beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:aop="http://www.springframework.org/schema/aop"
   xmlns:context="http://www.springframework.org/schema/context"
   xmlns:tx="http://www.springframework.org/schema/tx"
   xmlns:mvc="http://www.springframework.org/schema/mvc"
   xmlns:task="http://www.springframework.org/schema/task"
   xmlns:amq="http://activemq.apache.org/schema/core"
   xmlns:cache="http://www.springframework.org/schema/cache"
   xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
   xmlns:jms="http://www.springframework.org/schema/jms"
   xsi:schemaLocation="http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
   http://www.springframework.org/schema/aop
   http://www.springframework.org/schema/aop/spring-aop-4.3.xsd
   http://www.springframework.org/schema/context
   http://www.springframework.org/schema/context/spring-context-4.3.xsd
   http://www.springframework.org/schema/tx
   http://www.springframework.org/schema/tx/spring-tx-4.3.xsd
   http://activemq.apache.org/schema/core
   http://activemq.apache.org/schema/core/activemq-core-5.4.0.xsd
   http://www.springframework.org/schema/task
   http://www.springframework.org/schema/task/spring-task-4.3.xsd
   http://www.springframework.org/schema/mvc
   http://www.springframework.org/schema/mvc/spring-mvc-4.3.xsd
   http://www.springframework.org/schema/cache
   http://www.springframework.org/schema/cache/spring-cache.xsd
   http://www.springframework.org/schema/integration
   http://www.springframework.org/schema/integration/spring-integration.xsd
   http://www.springframework.org/schema/integration/jms
   http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.3.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"
   default-lazy-init="false">     

   <bean id="brokerUrl" class="java.lang.String">
       <constructor-arg value="#{appProperties.queueUrl}"/>
   </bean>

   <amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>

   <bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
       <constructor-arg ref="amqConnectionFactory"/>
       <property name="maxConnections" value="#{appProperties.maxConnections}"/>
       <property name="idleTimeout" value="#{appProperties.idleTimeout}"/>
       <property name="maximumActiveSessionPerConnection" value = "10"/> 

   </bean>

   <bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
       <constructor-arg ref="connectionFactory1"/>
   </bean>

   <bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
       <constructor-arg value="#{appProperties.toQueueName}"/>
   </bean>
</beans>

1 Ответ

0 голосов
/ 12 ноября 2018

Ожидаемое вами поведение. Тема JMS следует семантике публикации / подписки (т.е. pub / sub), где все подписчики получают все сообщения, отправленные в тему. В вашем случае у вас есть 10 подписчиков, и вы отправляете 50 сообщений. Каждый из этих 10 подписчиков получает каждое из 50 сообщений (в соответствии с семантикой pub / sub) и затем пересылает его в очередь. Таким образом, очередь получает 500 сообщений.

Если вы хотите, чтобы все потребители делились всеми сообщениями, вам не следует использовать тему JMS, а скорее очередь JMS.

...