Лучший способ обработки ответов в onMessage () - PullRequest
0 голосов
/ 04 мая 2018

У меня есть автономный слушатель MQ, который слушает очередь. Каков наилучший способ обработки ответов в onMessage (). Я не хочу, чтобы моя бизнес-логика была в onMessage (). Также я не хочу, чтобы onMessage () ожидал анализа ответа и сохранения в БД.

public abstract class MQReceiver implements MessageListener{
    public void pollResults(Long counter) throws JMSException, InterruptedException {
        Queue rQueue = null;
        QueueSession session = null;
        QueueReceiver receiver;
        count = counter;        

        try{
            session =   connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            rQueue = session.createQueue(getReceiveQueue());

            receiver = session.createReceiver(rQueue);
            receiver.setMessageListener(this);
            connection.start();
            while(count > 0){
                logger.info("Waiting......Count >> " + count);
                Thread.sleep(SLEEPTIME);    
            }

            if(count == 0){
                session.close();
                logger.info("exiting poll results");
            }

        }finally{
            if(session != null)
                session.close();
        }

      }

@Override
    public void onMessage(Message message) {
             if (message instanceof TextMessage) {
                //Parse and Apply business logic
                //Store in DB
             }
}

Ответы [ 2 ]

0 голосов
/ 07 мая 2018

Но почему вы не хотите, чтобы поток OnMessage ждал, когда вы завершите свою работу? Это в значительной степени цель системы обмена сообщениями. Если вам не удастся вставить в БД из-за какой-либо ошибки, сообщение будет откатано и не потеряно - и может быть предпринята вторая попытка. Если вы перегрузите работу логики / базы данных в другом месте, вы потеряете отказоустойчивость.

В любом случае, лучший способ - не порождать потоки или аналогичные из OnMessage, а помещать сообщение в некоторую внутреннюю очередь, которую вы читаете работающими потоками. Если вы просто раскручиваете потоки, у вас будет проблема с обработкой, скажем, нескольких тысяч сообщений в очереди при запуске. Для этой цели у Java есть BlockingQueue . Таким образом, вы можете контролировать, сколько потоков должно быть занято логикой / базой данных. Вам нужно где-то установить предел, сколько сообщений вы можете прочитать из ActiveMQ, прежде чем вам нужно будет ждать, пока бэкэнд не успеет. Хорошим способом сделать это является очередь.

Пример с очередью блокировки.

final BlockingQueue<String> internalQueue = new ArrayBlockingQueue<String>(CAPACITY);

...

new Thread(new Runnable() { // TODO make a named class and schedule as many thread as needed.
        @Override public void run() {
            try {
                while (true) {
                    String msg = internalQueue.take();
                    System.out.println(msg);
                }
            } catch (InterruptedException e) {
                System.out.println("Interrupted.");
            }
        }
    }).start();

...

public void onMessage(Message message) {
         if (message instanceof TextMessage) {
              internalQueue.put(((TextMessage)message).getText());
         }
}

В любом случае, если подумать об этом в другой раз, не лучше ли в любом случае просто сохранить очередь в ActiveMQ? Если процесс занимает слишком много времени, рассмотрите возможность разбиения потока на несколько этапов с очередями между ними.

Пример:

  • очередь -> проверка / проверка ошибок -> очередь -> массовая логика -> очередь -> постоянное хранение базы данных -> очередь -> уведомления
0 голосов
/ 04 мая 2018

Если вы не хотите ждать бизнес-логики и взаимодействия с БД, вы можете перевести работу в другой поток и вернуться.

Но учтите, что если вы перешли на другой поток, вы потеряете контроль над ним.

Я думаю, вы можете начать с CompletableFuture в Java 8:

        YourOffworkService service;
        ....
        public void onMessage(Message msg){
           if (message instance of TextMessage) {
             CompletableFuture.runAsync(() -> { service.process(msg);});
           } 
        }

Редактировать

В Java 7 вы можете сделать Thread или Runnable

public class YourOffworkService extends Thread {
      private Message message;

      public YourOffworkService(Message message) {
       this.message = message;
      }

      public void run() {

       // Here come the business logic / DB interaction
      }
}

Тогда в вашем onMessage():

public void onMessage(Message msg) {
    ...bla bla...
    new YourOffworkService(msg).start();
}
...