Как обернуть мост JMS в WebSphere MQ в синхронный вызов, используя шаблон запроса-ответа? - PullRequest
1 голос
/ 09 декабря 2010

Я просто имею дело с новым сценарием для меня, который, я полагаю, может быть обычным для некоторых:) ..

В соответствии с требованиями мне нужно создать пользовательский опыт, чтобы быть похожим на синхронный онлайнтранзакция для вызова веб-службы, которая фактически делегирует вызов серии IBM MQ с помощью асинхронного моста JMS-MQ.

Клиент вызывает веб-сервис, после чего его сообщение должно быть опубликовано в очереди JMS на сервере приложений, которая будет доставлена ​​в WebSphere MQ, а затем после обработки ответ будет доставлен обратно на сервер приложений в FIXED JMS.конечная точка очереди.

Требование относится к этой транзакции, для которой потребуется тайм-аут в случае, если WebSphere MQ не доставит ответ через определенный промежуток времени, чем веб-сервис должен отправить клиенту сигнал тайм-аута и игнорировать этотранзакция.

Эскиз задачи следующим образом.

Мне нужно заблокировать запрос в веб-сервисе до тех пор, пока не придет ответ или не истечет время ожидания.

Чем я ищу какую-нибудь открытую библиотеку, чтобы помочь мне в этой задаче.Или единственное решение - заблокировать поток и сохранить пул для ответа?Может быть, я мог бы реализовать какой-то блок со слушателем, чтобы получать уведомления, когда приходит ответ?

Немного обсуждения было бы очень полезно для меня сейчас, чтобы попытаться прояснить мои идеи по этому вопросу.Есть предложения?

У меня есть эскиз, который, я надеюсь, поможет очистить картинку;)

alt text

Ответы [ 2 ]

2 голосов
/ 10 января 2011

Привет, спасибо за публикацию собственного решения!

Да, receive () с таймаутом - самый элегантный способ в этом случае.

Остерегайтесь того, что происходит с сообщениями, которые не читаются из-за истечения времени ожидания. Если ваш клиент снова получит ту же очередь, он может получить устаревшее сообщение.

Удостоверьтесь, что сообщения, для которых истекло время ожидания, своевременно удаляются (если по какой-либо другой причине не заполнять очередь необработанными сообщениями).

Вы можете легко сделать это либо с помощью кода (установив время жизни для производителя сообщений), либо на сервере Websphere MQ (используя очереди, которые автоматически истекают сообщения).

Последнее проще, если вы не можете / не хотите изменять сторону кода MQ. Это то, что я бы сделал:)

1 голос
/ 11 декабря 2010

после пары дней кодирования я нашел решение для этого. Я использую стандартный EJB3 с аннотациями JAX-WS и Standard JMS.

Ниже приведен код, который я написал для соответствия требованиям. Это Session Bean без состояния с управляемой bean-транзакцией (BMT), так как использование стандартной управляемой контейнерной транзакции (CMT) вызывало некоторую зависание, я полагаю, потому что я пытался поместить оба взаимодействия JMS в одну и ту же транзакцию, как они находятся в тот же метод, поэтому обратите внимание, что мне пришлось начинать и завершать транзакции для каждого взаимодействия с очередями JMS. Я использую weblogic для этого решения. И я также кодировал MDB, который в основном потребляет сообщение из конечной точки очереди jms / Pergunta и помещает ответное сообщение в очередь jms / Resposta. Я сделал это, чтобы высмеивать ожидаемое поведение на стороне MQ этой проблемы. На самом деле в реальном сценарии у нас, вероятно, было бы какое-нибудь приложение COBOL на мэйнфрейме или даже другое Java-приложение, работающее с сообщениями и помещающее ответ в очередь ответов.

Если кому-то нужно попробовать этот код в основном, все, что вам нужно, это иметь контейнер J2EE5 и настроить 2 очереди с именами jndi: jms / Pergunta и jms / Resposta.

Код EJB / Webservice:

@Stateless
@TransactionManagement(TransactionManagementType.BEAN)
@WebService(name="DJOWebService")
public class DJOSessionBeanWS implements DJOSessionBeanWSLocal {

    Logger log = Logger.getLogger(DJOSessionBeanWS.class.getName());

    @Resource
    SessionContext ejbContext;

    // Defines the JMS connection factory.
    public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";

    // Defines request queue
    public final static String QUEUE_PERG = "jms/Pergunta";

    // Defines response queue
    public final static String QUEUE_RESP = "jms/Resposta";


    Context ctx;
    QueueConnectionFactory qconFactory;

    /**
     * Default constructor. 
     */
    public DJOSessionBeanWS() {
        log.info("Construtor DJOSessionBeanWS");
    }

    @WebMethod(operationName = "processaMensagem")
    public String processaMensagem(String mensagemEntrada, String idUnica)
    {
        //gets UserTransaction reference as this is a BMT EJB.
        UserTransaction ut = ejbContext.getUserTransaction();
        try {

            ctx = new InitialContext();
            //get the factory before any transaction it is a weblogic resource.
            qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
            log.info("Got QueueConnectionFactory");
            ut.begin();
            QueueConnection qcon = qconFactory.createQueueConnection();
            QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue qs = (Queue) (new InitialContext().lookup("jms/Pergunta"));
            TextMessage message = qsession.createTextMessage("this is a request message");
            message.setJMSCorrelationID(idUnica);
            qsession.createSender(qs).send(message);
            ut.commit();
            qcon.close();
            //had to finish and start a new transaction, I decided also get new references for all JMS related objects, not sure if this is REALLY required
            ut.begin();
            QueueConnection queuecon = qconFactory.createQueueConnection();
            Queue qreceive = (Queue) (new InitialContext().lookup("jms/Resposta"));
            QueueSession queuesession = queuecon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            String messageSelector = "JMSCorrelationID = '" + idUnica + "'";
            //creates que receiver and sets a message selector to get only related message from the response queue.
                    QueueReceiver qr = queuesession.createReceiver(qreceive, messageSelector);
            queuecon.start();
            //sets the timeout to keep waiting for the response...
            TextMessage tresposta = (TextMessage) qr.receive(10000);
            if(tresposta != null)
            {
                ut.commit();
                queuecon.close();
                return(tresposta.toString());
            }
            else{
                //commints anyway.. does not have a response though 
                ut.commit();
                queuecon.close();
                log.info("null reply, returned by timeout..");
                return "Got no reponse message.";
            }



        } catch (Exception e) {
            log.severe("Unexpected error occurred ==>> " + e.getMessage());
            e.printStackTrace();
            try {
                ut.commit();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            return "Error committing transaction after some other error executing ==> " + e.getMessage();
        } 

    }
}   

И это код для MDB, который высмеивает сторону MQ этой проблемы. Во время моих тестов у меня был фрагмент Thread.sleep для имитации и проверки тайм-аута на стороне клиента для проверки решения, но он не представлен в этой версии.

/**
 * Mock to get message from request queue and publish a new one on the response queue.
 */
@MessageDriven(
        activationConfig = { @ActivationConfigProperty(
                propertyName = "destinationType", propertyValue = "javax.jms.Queue"
        ) }, 
        mappedName = "jms/Pergunta")
public class ConsomePerguntaPublicaRespostaMDB implements MessageListener {

    Logger log = Logger.getLogger(ConsomePerguntaPublicaRespostaMDB.class.getName());

    // Defines the JMS connection factory.
    public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";

    // Define Queue de resposta
    public final static String QUEUE_RESP = "jms/Resposta";


    Context ctx;
    QueueConnectionFactory qconFactory;



    /**
     * Default constructor. 
     */
    public ConsomePerguntaPublicaRespostaMDB() {
        log.info("Executou construtor ConsomePerguntaPublicaRespostaMDB");
        try {
            ctx = new InitialContext();
        } catch (NamingException e) {
            e.printStackTrace();
        }
    }

    /**
     * @see MessageListener#onMessage(Message)
     */
    public void onMessage(Message message) {
        log.info("Recuperou mensagem da fila jms/FilaPergunta, executando ConsomePerguntaPublicaRespostaMDB.onMessage");
        TextMessage tm = (TextMessage) message;

        try {
            log.info("Mensagem recebida no onMessage ==>> " + tm.getText());

            //pega id da mensagem na fila de pergunta para setar corretamente na fila de resposta.
             String idMensagem = tm.getJMSCorrelationID();
             log.info("Id de mensagem que sera usada na resposta ==>> " + idMensagem);

            qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
            log.info("Inicializou contexto jndi e deu lookup na QueueConnectionFactory do weblogic com sucesso. Enviando mensagem");
            QueueConnection qcon = qconFactory.createQueueConnection();
            QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = (Queue) (ctx.lookup("jms/Resposta"));
            TextMessage tmessage = qsession.createTextMessage("Mensagem jms para postar na fila de resposta...");
            tmessage.setJMSCorrelationID(idMensagem);
            qsession.createSender(queue).send(tmessage);
        } catch (JMSException e) {
            log.severe("Erro no onMessage ==>> " + e.getMessage());
            e.printStackTrace();
        }  catch (NamingException e) {
            log.severe("Erro no lookup ==>> " + e.getMessage());
            e.printStackTrace();
        }

    }

}

[] s

...