Назначение
Я создал MyListener.java
для мониторинга своей очереди Oracle MY_QUEUE и MyConsumer.java
реализовал мою собственную MessageListener.onMessage
функциональность.
Как только я ставлю некоторые записи в MY_QUEUE, я хочу, чтобы MessageListener выводил «New Message ...» на консоль.
Проблема
Записи очереди будут обрабатываться только при первом запуске приложения. Если дополнительные записи ставятся в очередь, когда приложение уже запущено, функция MessageListener.onMessage не будет запущена.
Пример
Запустить приложение с 5 записями уже в очереди. Выход:
Initialized ...
Спящий ...
Новое сообщение ...
Новое сообщение ...
Новое сообщение ...
Новое сообщение ...
Новое сообщение ...
Спящий ...
Спящая ...
Закройте приложение и запустите приложение. Поставьте записи в очередь во время выполнения. Выход:
Initialized ...
Спящий ...
Спящий ...
Спящий ...
Спит ... (в данный момент вставлены записи в очередь)
Спящий ...
Спящий ...
Спящий ...
Спящая ...
Выйдите из приложения и снова запустите приложение (записи из 2. все еще находятся в очереди). Выход:
Initialized ...
Спящий ...
Новое сообщение ...
Новое сообщение ...
Новое сообщение ...
Новое сообщение ...
Новое сообщение ...
Спящий ...
Спящая ...
MyListener
package example;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
public class MyListener {
private static final String QUEUE_NAME = "MY_QUEUE";
private static final String QUEUE_USER = "myuser";
private static final String QUEUE_PW = "mypassword";
private QueueConnection queueConnection;
private QueueSession queueSession;
public MyListener() throws JMSException {
QueueConnectionFactory QFac = AQjmsFactory.getQueueConnectionFactory("xxx.xxx.xxx.xxx", "orcl", 1521, "thin");
this.queueConnection = QFac.createQueueConnection(QUEUE_USER, QUEUE_PW);
this.queueSession = this.queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
}
public static void main(String[] args) {
try {
MyListener myListener = new MyListener();
Queue queue = ((AQjmsSession) myListener.queueSession).getQueue(QUEUE_USER, QUEUE_NAME);
MessageConsumer mq = ((AQjmsSession) myListener.queueSession).createReceiver(queue);
MyConsumer mc = new MyConsumer();
mq.setMessageListener(mc);
myListener.queueConnection.start();
System.out.println("Initialized...");
while (true) {
try {
System.out.println("Sleeping...");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
System.out.println("Application closed");
}
}
}
MyConsumer
package example;
import javax.jms.Message;
import javax.jms.MessageListener;
public class MyConsumer implements MessageListener{
@Override
public void onMessage(Message arg0) {
System.out.println("New Message...");
}
}
PL / SQL-скрипт для постановки в очередь записей
DECLARE
msg SYS.aq$_jms_text_message;
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW (16);
i NUMBER;
BEGIN
msg := sys.aq$_jms_text_message.construct;
msg.set_text ('Testmessage');
enqueue_options.visibility := DBMS_AQ.immediate;
message_properties.priority := 1;
i := 0;
WHILE i < 5
LOOP
DBMS_AQ.enqueue (queue_name => 'MY_QUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => msg,
msgid => message_handle);
i := i + 1;
END LOOP;
COMMIT;
END;
Дополнительная информация
База данных: Oracle 11g2
Java Runtime: 1.6
Зависимости Maven:
- oracle-jdbc (11.2.0.4.0)
- xdb (1,0)
- Акапи (1,3)
- jmscommon (1.3.1_02)
Может кто-нибудь сказать мне, почему функция onMessage не будет запущена, когда я ставлю новые записи в очередь во время выполнения?
РЕДАКТИРОВАТЬ: ОК, я прекратил использовать JMS и теперь использую старый подход AQ dequeue для асинхронного получения моих сообщений. Я мог бы вернуться и попытаться выяснить, почему он не работает с приведенным выше кодом, но сейчас это низкий приоритет.