JMS 2.0 - Как получать сообщения из темы с общими пользователями? - PullRequest
0 голосов
/ 19 ноября 2018

Я использую ActiveMQ Artemis и JMS 2.0 для чтения сообщений с общими пользователями. У меня два вопроса:

  1. Можно ли использовать конфигурацию в формате xml.
  2. Когда я устанавливаю прослушиватель сообщений для потребителя, обязательно ли использовать цикл while? Если я не использую цикл while (true), программа прекратит работу, если в теме нет сообщений.

SharedConsumer.java

public class SharedConsumer {
    @Resource(lookup = "java:comp/DefaultJMSConnectionFactory")
    ConnectionFactory connectionFactory;

    public String maxConnectionForJSON;

    public void readFromTopicAndSendToQueue()throws Exception{
        Context initialContext = null;
        JMSContext jmsContext = null;
        int maxConnectionCount = 0;

        maxConnectionForJSON = "30";

        if (!StringUtils.isBlank(maxConnectionForJSON)){
            try{
                maxConnectionCount = Integer.parseInt(maxConnectionForJSON);
            }catch (Exception e){
                //logging
            }
        }
        if (maxConnectionCount != 0) {
            try {
                List<JMSConsumer> jmsConsumerList = new ArrayList<>();
                initialContext = new InitialContext();

                Topic topic = (Topic) initialContext.lookup("topic/exampleTopic");

                ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");

                jmsContext = cf.createContext("admin", "admin");

                for (int i = 0; i < maxConnectionCount; i++){
                    JMSConsumer jmsConsumer = jmsContext.createSharedDurableConsumer(topic, "ct");
                    MessageListener listener = new Listener();
                    jmsConsumer.setMessageListener(listener);

                }
                while (true) {
                    Thread.sleep(30000);
                }
            } catch (Exception e) {
                System.err.println(e.getMessage());
            } finally {
                 if (initialContext != null) {
                     initialContext.close();
                 }
                 if (jmsContext != null) {
                     jmsContext.close();
                 }
            }
        }
    }

    public static void main(final String[] args) throws Exception {
        SharedConsumer sharedConsumer = new SharedConsumer();
        sharedConsumer.readFromTopicAndSendToQueue();
    }
}

SharedConsumerListener.java

public class Listener implements MessageListener {
    public static int count = 0;

    @Override
    public void onMessage(Message message) {
        System.out.println(message.toString() + "\ncount :" + count);
        count++;
    }

} * * тысяча двадцать-один

Я мог бы использовать XML-файл для чтения очереди в JMS 1.1 (ActiveMQ). Я думал, что мы могли бы использовать с файлом конфигурации, как показано ниже в JMS 2.0 Artemis, но я ошибся Большое спасибо за вашу помощь Джастин Бертрам.

в файле конфигурации JMS 1.1

<bean id="brokerUrl" class="java.lang.String">
   <constructor-arg value="#{appProperties.queueUrl}"/>
</bean>

<amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>

<bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
   <constructor-arg ref="amqConnectionFactory"/>
   <property name="maxConnections" value="#{appProperties.maxConnections}"/>
   <property name="idleTimeout" value="#{appProperties.idleTimeout}"/>
   <property name="maximumActiveSessionPerConnection" value = "10"/> 

</bean>

<bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
   <constructor-arg ref="connectionFactory1"/>
</bean>

<bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
   <constructor-arg value="#{appProperties.queueName}"/>
</bean>

<task:executor id="mainExecutorForJSON" pool-size="#{appProperties.mainExecutorForJSONPoolSize}"
               queue-capacity="0" rejection-policy="CALLER_RUNS"/>

<int:channel id="jmsInChannelForJSON" >
    <int:dispatcher task-executor="mainExecutorForJSON"/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsInForJSON" destination="jSONNrtQueue" channel="jmsInChannelForJSON"
                                        concurrent-consumers="#{appProperties.concurrentConsumerCountForJSON}" />

<int:service-activator input-channel="jmsInChannelForJSON" ref="dataServiceJMS" />

1 Ответ

0 голосов
/ 19 ноября 2018

Короче говоря, да, это нормально, чтобы предотвратить завершение программы после установки прослушивателя сообщений потребителя JMS.

Когда вы создаете JMS-потребителя и устанавливаете его прослушиватель сообщений, реализация JMS-клиента создаст новые потоки в фоновом режиме для асинхронного прослушивания сообщений из потока, который создал получателя и установит прослушиватель. Поэтому поток, который создает потребителя и устанавливает слушателя, будет просто продолжен. В вашем случае вам нужно каким-то образом остановить выход и завершение работы потока, поэтому вам необходим цикл while.

...