Я использую Spring + MQ + Сервер приложений Websphere.
Я хочу использовать сообщения из MQ асинхронно и объединять сообщения, чтобы иметь список сообщений, чтобы было легко сохранять N номеров сущностей в базе данных в каждом отдельном коммите. (без выделения моей целевой базы данных Oracle слишком большим количеством коммитов)
Я использую DefaultMessageListenerContainer и синхронизировал метод onMessage, чтобы добавить сообщения (до размера пакета), и создаю поток, ожидающий выполнения условия (время / размер), и отправляю сообщения в другой поток, который выполняет Бизнес-логика и БД сохраняются.
Условие начала резьбы:
Как только первое сообщение поступило в метод onMessage, поток должен дождаться получения 25 сообщений в течение 1000 миллисекунд, а если в течение 1000 миллисекунд не было получено 25 сообщений, он передает доступные номера сообщений в другой поток.
Выпуск:
Я вижу, что поток запускается только во время настройки сервера, а не при первом вызове метода onMessage.
Какие-либо предложения / другим способом, пожалуйста, чтобы добиться сбора сообщений из очереди?
applicationContext.xml
<bean id="myMessageListener" class="org.mypackage.MyMessageListener">
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" ref="queue"/>
<property name="messageListener" ref="myMessageListener"/>
<property name ="concurrentConsumers" value ="10"/>
<property name ="maxConcurrentConsumers" value ="50"/>
</bean>
Слушатель:
package org.mypackage.MyMessageListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.mypackage.service.MyService;
public class MyMessageListener implements MessageListener {
private volatile long startTime = 0;
private volatile int messageCount;
private volatile List<String> messagesFromQueue = null;
private int batchSize = 25;
private long maximumBatchWaitTime = 1000;
@Autowired
private MyService myService;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
boolean threadRun = true;
while (threadRun) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println("InterruptedException is caught inside run method");
}
if ((messageCount >0 && messageCount == batchSize)) {
System.out.println("----Batch size Reached----");
threadRun = false;
processMsgsFromQueue(messagesFromQueue);
} else {
if (maximumBatchWaitTime > (System.currentTimeMillis() - startTime)) {
System.out.println("----Time limit is not reached----");
threadRun = true;
} else {
threadRun = false;
System.out.println("----Time limit is reached----");
processMsgsFromQueue(messagesFromQueue);
}
}
}
}
});
{
thread.start();
}
@Override
public synchronized void onMessage(Message message) {
if (messageCount == 0) {
startTime = System.currentTimeMillis();
messagesFromQueue = new ArrayList<String>();
System.out.println("----First Message Arrived at----"+startTime);
}
try {
messageCount++;
TextMessage tm = (TextMessage) message;
String msg = tm.getText();
messagesFromQueue.add(msg);
if (messageCount == 0) {
thread.start();
}
} catch (JMSException e1) {
e1.printStackTrace();
}
}
private void processMsgsFromQueue(List<String> messageFromQueue) {
System.out.println("Inside processMsgsFromQueue");
messageCount = 0;
messagesFromQueue = null;
if(!messageFromQueue.isEmpty()) {
this.myService.insertMsgsFromQueueToDB(messageFromQueue);
}
}
}