Получение сообщений из MQ и слияние в Spring JMS - PullRequest
0 голосов
/ 03 апреля 2019

Я использую Spring + MQ + Сервер приложений Websphere.

Я хочу использовать сообщения из MQ асинхронно и объединять сообщения, чтобы иметь список сообщений, чтобы было легко сохранять N номеров сущностей в базе данных в каждом отдельном коммите. (без выделения моей целевой базы данных Oracle слишком большим количеством коммитов)

Я использую DefaultMessageListenerContainer и синхронизировал метод onMessage, чтобы добавить сообщения (до размера пакета), и создаю поток, ожидающий выполнения условия (время / размер), и отправляю сообщения в другой поток, который выполняет Бизнес-логика и БД сохраняются.

Условие начала резьбы:

Как только первое сообщение поступило в метод onMessage, поток должен дождаться получения 25 сообщений в течение 1000 миллисекунд, а если в течение 1000 миллисекунд не было получено 25 сообщений, он передает доступные номера сообщений в другой поток.

Выпуск:

Я вижу, что поток запускается только во время настройки сервера, а не при первом вызове метода onMessage.

Какие-либо предложения / другим способом, пожалуйста, чтобы добиться сбора сообщений из очереди?

applicationContext.xml

<bean id="myMessageListener" class="org.mypackage.MyMessageListener">

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationName" ref="queue"/>
    <property name="messageListener" ref="myMessageListener"/>
    <property name ="concurrentConsumers" value ="10"/>
    <property name ="maxConcurrentConsumers" value ="50"/>        
</bean>

Слушатель:

package org.mypackage.MyMessageListener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.mypackage.service.MyService;

public class MyMessageListener implements MessageListener {

    private volatile long startTime = 0;
    private volatile int messageCount;
    private volatile List<String> messagesFromQueue = null;

    private int batchSize = 25;
    private long maximumBatchWaitTime = 1000;

    @Autowired
    private MyService myService;

    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            boolean threadRun = true;
                while (threadRun) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        System.out.println("InterruptedException is caught inside run method");
                    }
                    if ((messageCount >0 && messageCount == batchSize)) {
                        System.out.println("----Batch size Reached----");
                        threadRun = false;
                        processMsgsFromQueue(messagesFromQueue);
                    } else {

                        if (maximumBatchWaitTime > (System.currentTimeMillis() - startTime)) {
                              System.out.println("----Time limit is not reached----");
                              threadRun = true;
                        } else {
                              threadRun = false;
                              System.out.println("----Time limit is reached----");
                              processMsgsFromQueue(messagesFromQueue);
                        }
                    }
               }
          }
      });

    {
       thread.start();
    }

    @Override
    public synchronized void onMessage(Message message) {
        if (messageCount == 0) {
            startTime = System.currentTimeMillis();
            messagesFromQueue = new ArrayList<String>();
            System.out.println("----First Message Arrived at----"+startTime);
        }
        try {
            messageCount++;
            TextMessage tm = (TextMessage) message;
            String msg = tm.getText();
            messagesFromQueue.add(msg);

            if (messageCount == 0) {
                thread.start();
            }

        } catch (JMSException e1) {
             e1.printStackTrace();
        }
    }

    private void processMsgsFromQueue(List<String> messageFromQueue) {
       System.out.println("Inside processMsgsFromQueue");
       messageCount = 0;
       messagesFromQueue =  null;
       if(!messageFromQueue.isEmpty()) {
        this.myService.insertMsgsFromQueueToDB(messageFromQueue);
       }
   }
}

1 Ответ

0 голосов
/ 04 апреля 2019

вам также нужно синхронизировать доступ к messagesFromQueue.

List messagesFromQueue = Collections.synchronizedList(new ArrayList());
      ...
  synchronized (messagesFromQueue) {
      Iterator i = messagesFromQueue.iterator(); // Must be in synchronized block
      while (i.hasNext())
      ...
  }

https://docs.oracle.com/javase/7/docs/api/java/util/Collections.html#synchronizedList(java.util.List)

при каждом вызове processMsgsFromQueue у вас будет исключение NullPointerException !!

    private void processMsgsFromQueue(List<String> messageFromQueue) {
       System.out.println("Inside processMsgsFromQueue");
       messageCount = 0;
       messagesFromQueue =  null;
       if(!messageFromQueue.isEmpty()/*messageFromQueue is null!!*/) {
        this.myService.insertMsgsFromQueueToDB(messageFromQueue);
       }
   }

лучше сохранять сообщения, и когда коммит в порядке, вы очищаете список и сбрасываете счетчик.

package org.mypackage.MyMessageListener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.mypackage.service.MyService;

public class MyMessageListener implements MessageListener {

    private volatile long startTime = 0;
    private volatile int messageCount;
    private volatile List<String> messagesFromQueue = null;

    private int batchSize = 25;
    private long maximumBatchWaitTime = 1000;

    @Autowired
    private MyService myService;

    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            boolean threadRun = true;
                while (threadRun) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        System.out.println("InterruptedException is caught inside run method");
                    }
                    if ((messageCount >0 && messageCount == batchSize)) {
                        System.out.println("----Batch size Reached----");
                        threadRun = false;
                        processMsgsFromQueue(messagesFromQueue);
                    } else {

                        if (maximumBatchWaitTime > (System.currentTimeMillis() - startTime)) {
                              System.out.println("----Time limit is not reached----");
                              threadRun = true;
                        } else {
                              threadRun = false;
                              System.out.println("----Time limit is reached----");
                              processMsgsFromQueue(messagesFromQueue);
                        }
                    }
               }
          }
      });


    @Override
    public synchronized void onMessage(Message message) {
        if (messageCount == 0) {
            startTime = System.currentTimeMillis();
            messagesFromQueue = new ArrayList<String>();
            System.out.println("----First Message Arrived at----"+startTime);
        }
        try {
            messageCount++;
            TextMessage tm = (TextMessage) message;
            String msg = tm.getText();
            messagesFromQueue.add(msg);

            if (thread.getState() == Thread.State.NEW) {
                thread.start();
            }

        } catch (JMSException e1) {
             e1.printStackTrace();
        }
    }

    private void processMsgsFromQueue(List<String> messageFromQueue) {
       System.out.println("Inside processMsgsFromQueue");
       if(!messageFromQueue.isEmpty()) {
        this.myService.insertMsgsFromQueueToDB(messageFromQueue);
       }
       messageCount = 0;
       messagesFromQueue =  null;
   }
}
...