Для респондента в шаблоне запрос-ответ, какую очередь следует использовать для создания потребителя? - PullRequest
0 голосов
/ 31 мая 2019

Например, у меня есть IBM MQ с REQUESTQ и RESPONSEQ, когда я отправляю запрос в REQUESTQ, мне нужно получить ответ от RESPONSEQ.На основе этого кода ниже:

package requestReply;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;

import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.jms.JmsFactoryFactory;
import com.ibm.msg.client.wmq.WMQConstants;

/*
 * Implementation of requester class
 */
class Requestor implements Runnable {
       private Thread t;
       private String threadName;

       Requestor( String name){
           threadName = name;
           System.out.println("Creating Thread:" +  threadName );
       }

       public void run() {
            JmsConnectionFactory cf = null;
            Connection connection = null;
            Session session = null;
            Destination reqQ = null;
            Destination repQ = null;
            MessageProducer producer = null;
            MessageConsumer consumer = null;

            try {
              // Create a connection factory
              JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
              cf = ff.createConnectionFactory();

              // Set the properties
              cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_BINDINGS);
              cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, "QM2");

              // Create JMS objects
              connection = cf.createConnection();
              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

              // Create destination to send requests
              reqQ = session.createQueue("queue:///REQUESTQ");
              // Create destination to read replies
              repQ = session.createQueue("queue:///REPLYQ");

              // Create producer
              producer = session.createProducer(reqQ);

              // Create a request message
              Message requestMessage = session.createTextMessage("Requesting a service");
              // Tell the responder where to put replies.
              requestMessage.setJMSReplyTo(repQ);
              // Send it off
              producer.send(requestMessage);

              // Get only that reply that matches my request message id.
              String selector = "JMSCorrelationID='" + requestMessage.getJMSMessageID()+"'";

              // Create consumer with selector
              consumer = session.createConsumer(repQ, selector);

              // Start the connection
              connection.start();

              // Get the message
              Message receivedMessage = consumer.receive(35000);
              if(receivedMessage != null)
                  System.out.println("\nRequestor received message:\n" + receivedMessage);
              else
                  System.out.println("No message received");
            }catch(Exception ex){
                System.out.println(threadName);
                System.out.println(ex);
            }
       }

       // Start thread
       public void start ()
       {
          System.out.println("Starting " +  threadName );
          if (t == null)
          {
             t = new Thread (this, threadName);
             t.start ();
          }
       }

    }

/*
 * Implementation of Responder class
 */
class Responder implements Runnable {
       private Thread t;
       private String threadName;

       Responder( String name){
           threadName = name;
           System.out.println("Creating Thread: " +  threadName );
       }

       public void run() {
            JmsConnectionFactory cf = null;
            Connection connection = null;
            Session session = null;
            Destination reqQ = null;
            Destination repQ = null;
            MessageProducer producer = null;
            MessageConsumer consumer = null;

            try {
              // Create a connection factory
              JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
              cf = ff.createConnectionFactory();

              // Set the properties
              cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_BINDINGS);
              cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, "QM2");

              // Create JMS objects
              connection = cf.createConnection();
              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
              reqQ = session.createQueue("queue:///REQUESTQ");

              // Create consumer to read requests
              consumer = session.createConsumer(reqQ);

              // Start the connection
              connection.start();

              // Loop to read requests and respond
              while(true){
              Message receivedMessage = consumer.receive(35000);
              if(receivedMessage != null){
                  System.out.println("\nResponder received message:\n" + receivedMessage);
                  repQ = receivedMessage.getJMSReplyTo();
                  producer = session.createProducer(repQ);
                  Message requestMessage = session.createTextMessage("Responder service");
                  requestMessage.setJMSCorrelationID(receivedMessage.getJMSMessageID());
                  producer.send(requestMessage);
              }
              else
                  System.out.println("No message received");
              }
            }catch(Exception ex){
                System.out.println(threadName);
                System.out.println(ex);
            }
       }

       public void start ()
       {
          System.out.println("Starting " +  threadName );
          if (t == null)
          {
             t = new Thread (this, threadName);
             t.start ();
          }
       }

    }

public class ReqRep {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Requestor req = new Requestor( "Requester");
        req.start();

        Responder rep = new Responder( "Responder");
        rep.start();
    }
}

[Код был скопирован из: https://www.ibm.com/developerworks/community/blogs/messaging/entry/jms_request_reply_sample?lang=en]

Что я понимаю из этого кода, когда поток Requestor отправляет запрос на REQUESTQ, Responder извлечет информацию из REQUESTQ и повторно отправит ее на RESPONSEQ, чтобы Requestor мог получить фактический ответ от RESPONDERQ?

Я спрашиваю об этом, потому что, когда я действительно пытался, Requestor и Responder всегда зависали, чтобы получить ответ, который имеет смысл для меня, потому что, как только запрос находится в REQUESTQ, MQслужба уже избавилась от этого сообщения, обработала его и уже нажала на RESPONSEQ, а поскольку Requester ожидает Responder, а Responder не может извлечь из REQUESTQ ничего, что вызывает зависание.(Поправьте меня если я ошибаюсь)

1 Ответ

0 голосов
/ 02 июня 2019

Попробовал его на моей машине, и иногда он работал, как ожидалось, иногда зависал 35 секунд, пока сообщения не передавались.

Это проблема синхронизации при запуске Responder.Если Connection и Session установлены слишком рано, он не видит сообщения в первом receive и получает их только после первых 35 секунд ожидания.Чтобы преодолеть это, просто подождите немного между req.start() и rep.start(), тогда он будет работать согласованно.

Я рекомендую прочитать официальные учебные пособия IBM, например Точка-точка с JMS .В качестве плюса, он использует JMS 2.0, что упрощает программирование JMS.

...