Нет событий onMessage во время выполнения (Java JMS MessageListener в очереди Oracle) - PullRequest
0 голосов
/ 09 ноября 2018

Назначение

Я создал MyListener.java для мониторинга своей очереди Oracle MY_QUEUE и MyConsumer.java реализовал мою собственную MessageListener.onMessage функциональность.
Как только я ставлю некоторые записи в MY_QUEUE, я хочу, чтобы MessageListener выводил «New Message ...» на консоль.

Проблема

Записи очереди будут обрабатываться только при первом запуске приложения. Если дополнительные записи ставятся в очередь, когда приложение уже запущено, функция MessageListener.onMessage не будет запущена.

Пример

  1. Запустить приложение с 5 записями уже в очереди. Выход:

    Initialized ...
    Спящий ...
    Новое сообщение ...
    Новое сообщение ...
    Новое сообщение ...
    Новое сообщение ...
    Новое сообщение ...
    Спящий ...
    Спящая ...

  2. Закройте приложение и запустите приложение. Поставьте записи в очередь во время выполнения. Выход:

    Initialized ...
    Спящий ...
    Спящий ...
    Спящий ...
    Спит ... (в данный момент вставлены записи в очередь)
    Спящий ...
    Спящий ...
    Спящий ...
    Спящая ...

  3. Выйдите из приложения и снова запустите приложение (записи из 2. все еще находятся в очереди). Выход:

    Initialized ...
    Спящий ...
    Новое сообщение ...
    Новое сообщение ...
    Новое сообщение ...
    Новое сообщение ...
    Новое сообщение ...
    Спящий ...
    Спящая ...

MyListener

package example;

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;

public class MyListener {

    private static final String QUEUE_NAME = "MY_QUEUE";
    private static final String QUEUE_USER = "myuser";
    private static final String QUEUE_PW = "mypassword";
    private QueueConnection queueConnection;
    private QueueSession queueSession;

    public MyListener() throws JMSException {
        QueueConnectionFactory QFac = AQjmsFactory.getQueueConnectionFactory("xxx.xxx.xxx.xxx", "orcl", 1521, "thin");
        this.queueConnection = QFac.createQueueConnection(QUEUE_USER, QUEUE_PW);
        this.queueSession = this.queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    public static void main(String[] args) {

        try {
            MyListener myListener = new MyListener();
            Queue queue = ((AQjmsSession) myListener.queueSession).getQueue(QUEUE_USER, QUEUE_NAME);

            MessageConsumer mq = ((AQjmsSession) myListener.queueSession).createReceiver(queue);
            MyConsumer mc = new MyConsumer();
            mq.setMessageListener(mc);

            myListener.queueConnection.start();

            System.out.println("Initialized...");

            while (true) {
                try {
                    System.out.println("Sleeping...");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            System.out.println("Application closed");
        }

    }

}

MyConsumer

package example;

import javax.jms.Message;
import javax.jms.MessageListener;

public class MyConsumer implements MessageListener{

    @Override
    public void onMessage(Message arg0) {
        System.out.println("New Message...");

    }

}

PL / SQL-скрипт для постановки в очередь записей

DECLARE
   msg                  SYS.aq$_jms_text_message;
   enqueue_options      DBMS_AQ.ENQUEUE_OPTIONS_T;
   message_properties   DBMS_AQ.MESSAGE_PROPERTIES_T;
   message_handle       RAW (16);
   i                    NUMBER;
BEGIN
   msg := sys.aq$_jms_text_message.construct;
   msg.set_text ('Testmessage');
   enqueue_options.visibility := DBMS_AQ.immediate;
   message_properties.priority := 1;
   i := 0;

   WHILE i < 5
   LOOP
      DBMS_AQ.enqueue (queue_name           => 'MY_QUEUE',
                       enqueue_options      => enqueue_options,
                       message_properties   => message_properties,
                       payload              => msg,
                       msgid                => message_handle);
      i := i + 1;
   END LOOP;

   COMMIT;
END;

Дополнительная информация

База данных: Oracle 11g2
Java Runtime: 1.6
Зависимости Maven:

  • oracle-jdbc (11.2.0.4.0)
  • xdb (1,0)
  • Акапи (1,3)
  • jmscommon (1.3.1_02)

Может кто-нибудь сказать мне, почему функция onMessage не будет запущена, когда я ставлю новые записи в очередь во время выполнения?

РЕДАКТИРОВАТЬ: ОК, я прекратил использовать JMS и теперь использую старый подход AQ dequeue для асинхронного получения моих сообщений. Я мог бы вернуться и попытаться выяснить, почему он не работает с приведенным выше кодом, но сейчас это низкий приоритет.

1 Ответ

0 голосов
/ 20 ноября 2018

По существу, как только вы создаете AQjmsQueueReceiver и устанавливаете его прослушиватель сообщений, метод receive() завершается и AQjmsQueueReceiver выпадает из области видимости. Я предполагаю, что он вызывается из метода main, что также означает, что программа завершит работу. Вам необходимо:

  1. Измените ваше приложение так, чтобы ваши объекты JMS не выпадали из области видимости (потому что они будут собираться мусором).
  2. Запретить выход вашей программы во время ожидания сообщений.
...