JMS 2.0 - MQ 9 - тема Общая подписка не работает - PullRequest
0 голосов
/ 13 февраля 2019

Я сталкиваюсь с проблемой при разработке приложения, которое подписывается на MQ Topic (MQ version 9).

Мне нужно подключиться к общей теме, потому что приложение будет запускаться в нескольких экземплярах (кластер).

В спецификациях и документации говорится: «Недолговременная общая подписка используется клиентом, который должен иметь возможность делиться работой по получению сообщений из тематической подписки между несколькими потребителями. Недолговременный общий доступследовательно, подписка может иметь более одного потребителя. Каждое сообщение из подписки будет доставлено только одному из потребителей этой подписки. "

Для меня все клиенты, использующие одно и то же имя подписки, находятся в одном и том же"cluster ", только один клиент получит сообщение за один раз.

В моем коде, вдохновленном этой статьей , у меня есть исключение, когда второй клиент пытается создать общийподписка.Я действительно не понимаю, является ли это ошибкой в ​​реализации клиентских библиотек MQ или в моем коде.

Вот мой пример кода:

    import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;

import com.ibm.mq.jms.MQTopicConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;

public class TestGB2 {

    public static void main(final String[] args) throws Exception {

        for (int i = 0; i < 10; i++) {
            new Thread(new MyThread("THREAD" + i, "TESTSUB/#", "myClient", "SUBTEST")).start();
        }

    }

    public static class MyThread implements Runnable {

        private final String topicString;
        private final String clientId;
        private final String subscriptionName;

        public MyThread(final String threadName, final String topicString, final String clientId, final String subscriptionName) {
            Thread.currentThread().setName(threadName);
            this.topicString = topicString;
            this.clientId = clientId;
            this.subscriptionName = subscriptionName;
        }

        @Override
        public void run() {

            try {
                System.out.println(String.format("%s : Connecting...", Thread.currentThread().getName()));
                MQTopicConnectionFactory cf = new MQTopicConnectionFactory();
                cf.setHostName("xxxx");
                cf.setPort(1416);
                cf.setQueueManager("xxxx");
                cf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
                cf.setChannel("xxx");
                cf.setClientID(clientId);

                Connection con = cf.createConnection();

                Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
                con.start();
                Topic topic = session.createTopic(topicString);
                MessageConsumer messageConsumer = session.createSharedConsumer(topic, subscriptionName); // fail here

                System.out.println(String.format("%s : Waiting for a message...", Thread.currentThread().getName()));
                Message msg = messageConsumer.receive();
                System.out.println(String.format("%s : Received :\n%s", Thread.currentThread().getName(), msg));

            }
            catch (Exception ex) {
                System.out.println(String.format("%s : FAILED", Thread.currentThread().getName()));
                ex.printStackTrace();
            }

        }

    }
}

Приведенный ниже код пытается создать 10 потоков, потребляющихсообщения на ту же тему.Только первый поток может подключиться, все остальные терпят неудачу со следующим исключением:

    com.ibm.msg.client.jms.DetailedIllegalStateException: JMSWMQ0026: Failed to subscribe to topic 'TESTSUB' with selector 'none' using MQSUB.
There may have been a problem creating the subscription due to it being used by another message consumer.
Make sure any message consumers using this subscription are closed before trying to create a new subscription under the same name. Please see the linked exception for more information.
    at com.ibm.msg.client.wmq.common.internal.Reason.reasonToException(Reason.java:472)
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:214)
    at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:212)
    at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:112)
    at com.ibm.msg.client.wmq.internal.WMQConsumerShadow.initialize(WMQConsumerShadow.java:1038)
    at com.ibm.msg.client.wmq.internal.WMQSyncConsumerShadow.initialize(WMQSyncConsumerShadow.java:134)
    at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.<init>(WMQMessageConsumer.java:470)
    at com.ibm.msg.client.wmq.internal.WMQSession.createSharedConsumer(WMQSession.java:938)
    at com.ibm.msg.client.jms.internal.JmsSessionImpl.createSharedConsumer(JmsSessionImpl.java:4228)
    at com.ibm.msg.client.jms.internal.JmsSessionImpl.createSharedConsumer(JmsSessionImpl.java:4125)
    at com.ibm.mq.jms.MQSession.createSharedConsumer(MQSession.java:1319)
    at TestGB.lambda$0(TestGB.java:33)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2042' ('MQRC_OBJECT_IN_USE').
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:202)
    ... 11 more

Пробовал с последней библиотекой:

<dependency>
    <groupId>com.ibm.mq</groupId>
    <artifactId>com.ibm.mq.allclient</artifactId>
    <version>9.1.1.0</version>
</dependency>

1 Ответ

0 голосов
/ 14 февраля 2019

Краткое описание проблемы

Проблема не в вашей программе, проблема в очереди моделей, связанной с темой, на которую вы подписываетесь.

В диспетчере очереди, если вы посмотритеу объекта темы, которому будет соответствовать ваша подписка, у него будет параметр MNDURMDL, указывающий на очередь модели.

Если вы посмотрите на очередь модели, вы заметите два параметра, в которых один или оба могут вызватьошибка, которую вы получаете:

[ DEFSOPT( EXCL | SHARED ) ]
[ SHARE | NOSHARE ]

Они должны быть установлены на DEFSOPT(SHARED) и SHARE.Если для одного из них задано другое значение, у вас будет только один подписчик в общей подписке.


Дополнительные сведения о причине проблемы

В IBM MQ Pub/ Sub, когда вы создаете подписку JMS, MQ рассматривает это как управляемую подписку, в фоновом режиме IBM MQ создаст временную очередь для подписки на строку темы.Если это недолговременная подписка, очередь является временной динамической очередью.

Причина сбоя заключается в том, что первый поток откроет временную динамическую очередь в монопольном режиме, а другие потоки не смогут открытьвременная динамическая очередь, и вы получаете сообщение об ошибке MQRC_OBJECT_IN_USE.


Возможная причина, при которой была создана очередь модели MNDURMDL для конкретного приложения

Я подозреваю, что причина в том, что IBMс несколькими различными очередями моделей по умолчанию.

По умолчанию для недолговечного абонента установлены следующие параметры:

   QUEUE(SYSTEM.NDURABLE.MODEL.QUEUE)      TYPE(QMODEL)
   DEFSOPT(SHARED)                         SHARE

Существует еще одна очередь по умолчанию, которая не является специфичной для публикации / подписки и имеет этинастройки:

   QUEUE(SYSTEM.DEFAULT.MODEL.QUEUE)       TYPE(QMODEL)
   DEFSOPT(EXCL)                           NOSHARE

Вероятно, очередь модели, созданная для использования вашим тематическим объектом, была создана с помощью команды, подобной следующей, которая по умолчанию будет использовать настройку SYSTEM.DEFAULT.MODEL.QUEUE.:

DEFINE QMODEL(xxx)

В будущем вы можете либо специально установить эти два параметра, либо определить его с помощью ключевого слова LIKE, чтобы заставить егоиспользуйте другую очередь для моделирования параметров, обе команды приведены ниже:

DEFINE QMODEL(xxx) DEFSOPT(SHARED) SHARE
DEFINE QMODEL(xxx) LIKE(SYSTEM.NDURABLE.MODEL.QUEUE)

Дополнительные сведения о создании и использовании специфических для приложения объектов TOPIC и очередей MODEL

По умолчанию корневой каталогузел дерева представлен стандартным объектом TOPIC с именем SYSTEM.BASE.TOPIC, очереди модели по умолчанию, связанные с этой темой, показаны ниже:

   TOPIC(SYSTEM.BASE.TOPIC)                TYPE(LOCAL)
   TOPICSTR()                              MDURMDL(SYSTEM.DURABLE.MODEL.QUEUE)
   MNDURMDL(SYSTEM.NDURABLE.MODEL.QUEUE)

Если вы не определите никаких дополнительных административных объектов TOPIC, то всетемы совпадают с SYSTEM.BASE.TOPIC.Кроме того, если вы не определяете какие-либо дополнительные административные объекты TOPIC и хотите предоставить разрешение приложения для определенного подмножества дерева тем (например, строк тем, начинающихся с TESTSUB), вы должны предоставить разрешения через SYSTEM.BASE.TOPIC, это вturn предоставляет приложению доступ к любой произвольной строке темы без ограничений.

Рекомендуется создать объект TOPIC со строкой темы, которая соответствует части дерева тем, к которой приложение должно иметь доступ.Для вашего примера TESTSUB/#, если ваш администратор определил новый объект TOPIC и указал TOPICSTR(TESTSUB), значения по умолчанию создадут его следующим образом:

   TOPIC(TESTSUB.TOPIC)                    TYPE(LOCAL)
   TOPICSTR(TESTSUB)                       MDURMDL( )
   MNDURMDL( )

пустые значения MDURMDL и MNDURMDLскажите MQ использовать значение из ближайшего более высокого объекта темы в дереве, если больше ничего не определено, это будет SYSTEM.BASE.TOPIC, а очереди модели по-прежнему будут использовать очереди моделей SYSTEM.DURABLE.MODEL.QUEUE и SYSTEM.NDURABLE.MODEL.QUEUE.

Администратор может вместо этого создать объект TOPIC и указать различные очереди моделей, например:

   TOPIC(TESTSUB.TOPIC)                    TYPE(LOCAL)
   TOPICSTR(TESTSUB)                       MDURMDL(TESTSUB.DURABLE.MODEL.QUEUE)
   MNDURMDL(TESTSUB.NDURABLE.MODEL.QUEUE)

Таким образом, они могут определять очереди моделей для конкретных приложений, которые имеют параметры, необходимые для общих подписок, а невоздействовать на очереди модели SYSTEM.Другое преимущество заключается в том, что они могут предоставлять приложениям разрешения только для строк тем, начинающихся с TESTSUB, например TESTSUB/A или TESTSUB/B или TESTSUB/X/Y/Z.

...