Как удалить сообщение из очереди по условию? - PullRequest
1 голос
/ 24 июня 2019

Я получаю сообщения от IBM MQ. Как удалить сообщение из очереди по условию?

Я пытался установить gmo.options = CMQC.MQGMO_MSG_UNDER_CURSOR; но это мне не помогло.

MQQueue queue = queueManager.accessQueue(e.getIbmQueue().trim(), CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_FAIL_IF_QUIESCING | CMQC.MQOO_INQUIRE, null, null, null);

MQGetMessageOptions gmo = new MQGetMessageOptions();
                        gmo.options = MQGMO_ALL_MSGS_AVAILABLE | MQGMO_WAIT | MQGMO_PROPERTIES_AS_Q_DEF | MQGMO_FAIL_IF_QUIESCING | MQOO_INPUT_AS_Q_DEF | MQGMO_SYNCPOINT;
gmo.matchOptions = MQMO_MATCH_CORREL_ID;
gmo.waitInterval = 50000;
byte[] body = null;
while (true) {
    try {
        queue.get(msg, gmo);
        body = new byte[msg.getMessageLength()];
        String businessIdFromIbm = msg.getStringProperty("usr.uuid");
        if (businessIdFromIbm.equals("123")) {
            //delete message
        }
        msg.clearMessage();
}

Ответы [ 2 ]

0 голосов
/ 25 июня 2019

Вы можете прочитать мой полный блог здесь .Вот его дайджест-версия для читателя.

Существует неправильное представление о фильтрации сообщений на основе MQ / JMS.Некоторые люди думают, что администратор очередей IBM MQ делает что-то особенное для приложений JMS, а не для простых приложений Java или C / C ++ / C # / COBOL.Это просто НЕ правда.Прочтите первый абзац селекторов сообщений в JMS из Центра знаний для получения дополнительной информации.Примечание. Для приложений C / C ++ / C # / COBOL они могут использовать структуру SelectionString из MQOD для выбора сообщений.

Я создал класс селектора сообщений для приложений POJO MQ.

Здесьфрагмент кода MQTest12MS, который показывает, как использовать класс MessageSelector.

ms = new MessageSelector(qMgr);
ms.openQueue(inputQName);
ms.setFilter("SomeNum", MessageSelector.Conditions.GREATER_THAN_EQUAL, 123);

while (true)
{
   receiveMsg = ms.getMessage(startAtBeginning);

   // got the message, now go and do something with it.

   // set flag to continue rather than restart at the beginning.
   startAtBeginning = false;
}
ms.closeQueue();

Вот класс MessageSelector.

import java.io.IOException;

import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;

/**
 * Class Name
 *  MessageSelector
 *
 * Description
 *  This java class will retrieve messages from a queue based on a filter.
 *
 * @author Roger Lacroix
 * @version 1.0.0
 * @license Apache 2 License
 */
public class MessageSelector
{
   public enum Conditions
   {
      EQUAL, NOT_EQUAL, LESS_THAN, LESS_THAN_EQUAL, GREATER_THAN, GREATER_THAN_EQUAL;
   }

   private MQQueueManager  qMgr = null;
   private MQQueue         inQ = null;
   private String          filterName = null;
   private Conditions      filterCondition;
   private Object          filterValue = null;

   /**
    * The constructor
    * @param qMgr - must have a valid/active connection to the queue manager
    */
   public MessageSelector(MQQueueManager qMgr)
   {
      super();
      this.qMgr = qMgr;
   }

   /**
    * Open the queue for both browsing and destructive gets.
    * @param qName
    * @throws MQException
    */
   public void openQueue(String qName) throws MQException
   {
      inQ = qMgr.accessQueue(qName, CMQC.MQOO_INQUIRE + CMQC.MQOO_BROWSE + CMQC.MQOO_FAIL_IF_QUIESCING + CMQC.MQOO_INPUT_SHARED);
   }

   /**
    * Close the queue.
    * @throws MQException
    */
   public void closeQueue() throws MQException
   {
      if (inQ != null)
         inQ.close();
   }

   /**
    * Set the filter name, condition and value.
    * @param name
    * @param condition
    * @param value
    * @throws IllegalArgumentException
    */
   public void setFilter(String name, Conditions condition, Object value) throws IllegalArgumentException
   {
      if (name == null)
         throw new IllegalArgumentException("Filter name cannot be null.");
      else if ("".equals(name))
         throw new IllegalArgumentException("Filter name cannot be blank.");
      else if (value == null)
         throw new IllegalArgumentException("Filter value cannot be null.");

      if ( (value instanceof String) || (value instanceof Boolean) ||
           (value instanceof Byte)   || (value instanceof Byte[])  )
       {
          if ( (Conditions.EQUAL != condition) && (Conditions.NOT_EQUAL != condition) )
          {
             throw new IllegalArgumentException("Filter condition can only be EQUAL or NOT_EQUAL.");
          }
       }
       else if ( (value instanceof Integer) || (value instanceof Long) ||
                 (value instanceof Double)  || (value instanceof Float) )
       {
          if ( (Conditions.EQUAL != condition) && (Conditions.NOT_EQUAL != condition) &&
               (Conditions.LESS_THAN != condition) && (Conditions.LESS_THAN_EQUAL != condition) &&
               (Conditions.GREATER_THAN != condition) && (Conditions.GREATER_THAN_EQUAL != condition) )
          {
             throw new IllegalArgumentException("Filter condition must be one of the following: EQUAL, NOT_EQUAL, LESS_THAN, LESS_THAN_EQUAL, GREATER_THAN, GREATER_THAN_EQUAL.");
          }
       }
       else
       {
          throw new IllegalArgumentException("Unknown Object type for Filter value.");
       }

      /**
       * Pass the checks, save the values
       */
      this.filterName = name;
      this.filterCondition = condition;
      this.filterValue = value;
   }

   /**
    * Retrieve the next matching message from the queue.
    * @param reset - Start over from the beginning of the queue.
    * @return
    * @throws MQException
    * @throws IOException
    */
   public MQMessage getMessage(boolean reset) throws MQException, IOException
   {
      MQGetMessageOptions gmo = new MQGetMessageOptions();
      if (reset)
         gmo.options = CMQC.MQGMO_BROWSE_FIRST + CMQC.MQGMO_NO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING;
      else
         gmo.options = CMQC.MQGMO_BROWSE_NEXT + CMQC.MQGMO_NO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING;
      MQMessage getMsg = null;

      while (true)
      {
         getMsg = new MQMessage();

         inQ.get(getMsg, gmo);

         if (performConditionalTest(getMsg))
         {
            deleteMessage();
            break;
         }

         gmo.options = CMQC.MQGMO_BROWSE_NEXT + CMQC.MQGMO_NO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING;
      }

      return getMsg;
   }

   /**
    * Handle the conditional testing of the value.
    * @param getMsg
    * @return true/false
    */
   private boolean performConditionalTest(MQMessage getMsg)
   {
      boolean flag = false;

      try
      {
         if (filterValue instanceof String)
         {
            String value = getMsg.getStringProperty(filterName);
            if (value != null)
            {
               if ( (Conditions.EQUAL == filterCondition) && (((String)filterValue).equals(value)) )
                  flag = true;
               else if ( (Conditions.NOT_EQUAL == filterCondition) && (!(((String)filterValue).equals(value))) )
                  flag = true;
            }
         }
         else if (filterValue instanceof Integer)
         {
            int value = getMsg.getIntProperty(filterName);

            if ( (Conditions.EQUAL == filterCondition) && (value == (Integer)filterValue) )
               flag = true;
            else if ( (Conditions.NOT_EQUAL == filterCondition) && (value != (Integer)filterValue) )
               flag = true;
            else if ( (Conditions.LESS_THAN == filterCondition) && (value < (Integer)filterValue) )
               flag = true;
            else if ( (Conditions.LESS_THAN_EQUAL == filterCondition) && (value <= (Integer)filterValue) )
               flag = true;
            else if ( (Conditions.GREATER_THAN == filterCondition) && (value > (Integer)filterValue) )
               flag = true;
            else if ( (Conditions.GREATER_THAN_EQUAL == filterCondition) && (value >= (Integer)filterValue) )
               flag = true;
         }
         else if (filterValue instanceof Long)
         {
            long value = getMsg.getLongProperty(filterName);

            if ( (Conditions.EQUAL == filterCondition) && (value == (Long)filterValue) )
               flag = true;
            else if ( (Conditions.NOT_EQUAL == filterCondition) && (value != (Long)filterValue) )
               flag = true;
            else if ( (Conditions.LESS_THAN == filterCondition) && (value < (Long)filterValue) )
               flag = true;
            else if ( (Conditions.LESS_THAN_EQUAL == filterCondition) && (value <= (Long)filterValue) )
               flag = true;
            else if ( (Conditions.GREATER_THAN == filterCondition) && (value > (Long)filterValue) )
               flag = true;
            else if ( (Conditions.GREATER_THAN_EQUAL == filterCondition) && (value >= (Long)filterValue) )
               flag = true;
         }
         else if (filterValue instanceof Double)
         {
            double value = getMsg.getDoubleProperty(filterName);

            if ( (Conditions.EQUAL == filterCondition) && (value == (Double)filterValue) )
               flag = true;
            else if ( (Conditions.NOT_EQUAL == filterCondition) && (value != (Double)filterValue) )
               flag = true;
            else if ( (Conditions.LESS_THAN == filterCondition) && (value < (Double)filterValue) )
               flag = true;
            else if ( (Conditions.LESS_THAN_EQUAL == filterCondition) && (value <= (Double)filterValue) )
               flag = true;
            else if ( (Conditions.GREATER_THAN == filterCondition) && (value > (Double)filterValue) )
               flag = true;
            else if ( (Conditions.GREATER_THAN_EQUAL == filterCondition) && (value >= (Double)filterValue) )
               flag = true;
         }
         else if (filterValue instanceof Float)
         {
            float value = getMsg.getFloatProperty(filterName);

            if ( (Conditions.EQUAL == filterCondition) && (value == (Float)filterValue) )
               flag = true;
            else if ( (Conditions.NOT_EQUAL == filterCondition) && (value != (Float)filterValue) )
               flag = true;
            else if ( (Conditions.LESS_THAN == filterCondition) && (value < (Float)filterValue) )
               flag = true;
            else if ( (Conditions.LESS_THAN_EQUAL == filterCondition) && (value <= (Float)filterValue) )
               flag = true;
            else if ( (Conditions.GREATER_THAN == filterCondition) && (value > (Float)filterValue) )
               flag = true;
            else if ( (Conditions.GREATER_THAN_EQUAL == filterCondition) && (value >= (Float)filterValue) )
               flag = true;
         }
         else if (filterValue instanceof Boolean)
         {
            Boolean value = getMsg.getBooleanProperty(filterName);
            if ( (value != null) && ((Boolean)filterValue == value) )
               flag = true;
         }
         else if (filterValue instanceof Byte)
         {
            byte value = getMsg.getByteProperty(filterName);
            if ((Byte)filterValue == value)
               flag = true;
         }
         else if (filterValue instanceof Byte[])
         {
            byte[] value = getMsg.getBytesProperty(filterName);
            if ( (value != null) && (java.util.Arrays.equals((byte[])filterValue, value)) )
               flag = true;
         }
      }
      catch (Exception e)
      {}

      return flag;
   }

   /**
    * Delete the message that the cursor is pointing to.
    */
   private void deleteMessage()
   {
      MQMessage deleteMsg = new MQMessage();
      MQGetMessageOptions gmo = new MQGetMessageOptions();
      gmo.options = CMQC.MQGMO_MSG_UNDER_CURSOR + CMQC.MQGMO_NO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING + CMQC.MQGMO_ACCEPT_TRUNCATED_MSG;

      /**
       * don't need it - because we already have the message
       * just delete it.
       */
      try
      {
         inQ.get(deleteMsg, gmo, 1);  // only get 1 byte - who cares right!!
      }
      catch (MQException e)
      {}
   }
}

Обновление 2019/06/25: я обновилМетод setFilter класса MessageSelector.

0 голосов
/ 24 июня 2019

Вызов queue.get в приведенном выше примере является деструктивным: сообщение будет логически удалено из очереди во время этого вызова.

Вы используете синхронизирующую точку, поэтому вам необходимо вызватьqueuemanager.commit (), чтобы завершить единицу работы (или точка синхронизации будет зафиксирована автоматически, если вы вызовете queuemanager.disconnect ()).

MQGMO_MSG_UNDER_CURSOR действителен только в том случае, если вы открыли очередь для просмотра (CMQC.MQOO_BROWSE) и создали по крайней мере один просмотр для этой очереди, так что ваше приложение имеет правильное сообщение под курсором обзора.

Если вы хотите просмотреть сообщение, а затем удалить его со вторым queue.get, чтобы удалить сообщение под курсором обзора, ваш первый queue.get должен будет указать либо MQGMO_BROWSE_FIRST, либо MQGMO_BROWSE_NEXT в ГМО.опции;и затем второй queue.get должен будет предоставить MQGMO_MSG_UNDER_CURSOR в качестве параметров соответствия без любого набора параметров просмотра, чтобы разрушительно удалить просмотренное сообщение.

MQQueue queue = queueManager.accessQueue(e.getIbmQueue().trim(), CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_BROWSE | CMQC.MQOO_FAIL_IF_QUIESCING | CMQC.MQOO_INQUIRE, null, null, null);

MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = CMQC.MQGMO_ALL_MSGS_AVAILABLE | CMQC.MQGMO_WAIT | CMQC.MQGMO_PROPERTIES_AS_Q_DEF | CMQC.MQGMO_FAIL_IF_QUIESCING | CMQC.MQGMO_BROWSE_NEXT;
gmo.matchOptions = MQMO_MATCH_CORREL_ID;
gmo.waitInterval = 50000;
byte[] body = null;
while (true) {
    try {
        queue.get(msg, gmo);
        body = new byte[msg.getMessageLength()];
        String businessIdFromIbm = msg.getStringProperty("usr.uuid");
        if (businessIdFromIbm.equals("123")) {
            //delete message
            MQGetMessageOptions gmo2 = new MQGetMessageOptions();
            gmo2.options =  CMQC.MQGMO_MSG_UNDER_CURSOR | CMQC.MQGMO_FAIL_IF_QUIESCING | CMQC.MQGMO_SYNCPOINT;
            queue.get(msg, gmo2);
            // Be prepared to handle the case where the message has been removed by another application (or expired) and so you receive MQRC 2033
            queueManager.commit();
        }
        msg.clearMessage();
}

Альтернативный подход (что потребовало бы меньше обращений к администратору очередей, если это является проблемой), было бы использовать JMS API для MQ и использовать селектор на usr.uuid, так что только сообщения, для которых установлено значение 123, возвращаются вприложение.Смотри https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.1.0/com.ibm.mq.dev.doc/q031980_.htm

...