Провайдер IBM MQ для JMS: как автоматически откатывать сообщения? - PullRequest
0 голосов
/ 07 августа 2020

Рабочие версии в приложении

  • Версия IBM AllClient: 'com.ibm.mq:com.ibm.mq.allclient:9.1.1.0'
  • org.springframework: spring-jms: 4.3.9.RELEASE
  • javax.jms: javax.jms-api: 2.0.1

Мое требование заключается в том, что в случае сбоя обработки сообщения из-за того, что потребитель недоступен (например, БД недоступен), сообщение остается в очереди или помещается обратно в очередь (если это вообще возможно). Это связано с тем, что порядок сообщений важен, сообщения должны использоваться в том же порядке, в котором они были получены. Приложение Java является однопоточным.

Я пробовал следующее

@Override
public void onMessage(Message message)
{
   try{
      if(message instanceOf Textmessage)
      {
      }
   
      :

      : 
      throw new Exception("Test");// Just to test the retry
    }
    catch(Exception ex)
    {
            try
            {
                int temp = message.getIntProperty("JMSXDeliveryCount");
                throw new RuntimeException("Redlivery attempted ");
                // At this point, I am expecting JMS to put the message back into the queue.
                // But it is actually put into the Bakout queue.
            }
            catch(JMSException ef)
            {
                String temp = ef.getMessage();
            }

    }
}

Я установил это в своей spring. xml для bean-компонента jmsContainer.

    <property name="sessionTransacted" value="true" />

Что не так с приведенным выше кодом?

И если вернуть сообщение в очередь нецелесообразно, как можно просмотреть сообщение, обработать его и, в случае успеха, вытащить сообщение ( так он потребляется и больше не в очереди)? Поддерживается ли этот сценарий в поставщике IBM для JMS?

В локальной очереди IBM MQ есть BOTHRESH(1).

Ответы [ 3 ]

3 голосов
/ 14 августа 2020

Чтобы сохранить порядок сообщений, одним из подходов может быть временная остановка приемника сообщений как часть вашей стратегии отката. Глядя на Spring Boot do c для DefaultMessageListenerContainer, есть метод stop(Runnable callback). Я экспериментировал с использованием этого при откате следующим образом.

Чтобы обеспечить однопоточность моего слушателя, на моем DefaultJmsListenerContainerFactory я установил containerFactory.setConcurrency("1").

В моем слушателе я установил an id

@JmsListener(destination = "DEV.QUEUE.2", containerFactory = "listenerTwoFactory", concurrency="1", id="listenerTwo")

И получить экземпляр DefaultMessageListenerContainer.

JmsListenerEndpointRegistry reg = context.getBean(JmsListenerEndpointRegistry.class);
DefaultMessageListenerContainer mlc = (DefaultMessageListenerContainer) reg.getListenerContainer("listenerTwo");

Для тестирования я проверяю JMSXDeliveryCount и генерирую исключение для отката.

retryCount = Integer.parseInt(msg.getStringProperty("JMSXDeliveryCount"));
if (retryCount < 5) {
    throw new Exception("Rollback test "+retryCount);
}

При обработке улова Listener я вызываю stop(Runnable callback) в экземпляре DefaultMessageListenerContainer и передаю новый класс ContainerTimedRestart, как определено ниже.

//catch processing here and decide to rollback
mlc.stop(new ContainerTimedRestart(mlc,delay));
System.out.println("#### "+getClass().getName()+" Unable to process message.");
throw new Exception();

ContainerTimedRestart extends Runnable и DefaultMessageListenerContainer отвечает за вызов метода run() после завершения вызова остановки.

public class ContainerTimedRestart implements Runnable {

  //Container instance to restart.
  private DefaultMessageListenerContainer theMlc;

  //Default delay before restart in mills.
  private long theDelay = 5000L;

  //Basic constructor for testing.
  public ContainerTimedRestart(DefaultMessageListenerContainer mlc, long delay) {
    theMlc = mlc;
    theDelay = delay;
  }

  public void run(){
    //Validate container instance.

    try {
      System.out.println("#### "+getClass().getName()+"Waiting for "+theDelay+" millis.");
      Thread.sleep(theDelay);
      System.out.println("#### "+getClass().getName()+"Restarting container.");
      theMlc.start();
      System.out.println("#### "+getClass().getName()+"Container started!");
    } catch (InterruptedException ie) {
      ie.printStackTrace();

      //Further checks and ensure container is in correct state.
      //Report errors.
    }
  }

Я загрузил в свою очередь три сообщения с полезными данными «a», «b» и «c» соответственно и запустил прослушиватель.

Проверяя DEV.QUEUE.2 в моем диспетчере очередей, я вижу IPPROCS(1), подтверждающее, что только один дескриптор приложения имеет открытую очередь. Сообщения обрабатываются по порядку после того, как каждое повторяется пять раз и с 5-секундной задержкой между попытками отката.

1 голос
/ 17 августа 2020

В Spring JMS вы можете определить свой собственный контейнер. Один контейнер создается для одного места назначения Jms. Мы должны запустить однопоточный прослушиватель JMS, чтобы поддерживать порядок сообщений, чтобы эта работа установила параллелизм на 1.

Мы можем спроектировать наш контейнер так, чтобы он возвращал null, когда он обнаруживает ошибки, после сбоя все принимаемые вызовы должен возвращать null, чтобы сообщения не опрашивались от места назначения, пока место назначения снова не станет активным. Мы можем поддерживать активное состояние, используя временную метку, которая может составлять простые миллисекунды. Образца конфигурации JMS должно быть достаточно, чтобы добавить отсрочку. Вы можете добавить небольшой сон вместо непрерывного возврата null из метода receiveMessage, например, засыпать в течение 10 секунд перед выполнением следующего вызова, это сэкономит некоторые ресурсы ЦП.

@Configuration
@EnableJms
public class JmsConfig {

  @Bean
  public JmsListenerContainerFactory<?> jmsContainerFactory(ConnectionFactory connectionFactory,
      DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory() {
      @Override
      protected DefaultMessageListenerContainer createContainerInstance() {
        return new DefaultMessageListenerContainer() {
          private long deactivatedTill = 0;

          @Override
          protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
            if (deactivatedTill < System.currentTimeMillis()) {
              return receiveFromConsumer(consumer, getReceiveTimeout());
            }
            logger.info("Disabled due to failure :(");
            return null;
          }

          @Override
          protected void doInvokeListener(MessageListener listener, Message message)
              throws JMSException {
            try {
              super.doInvokeListener(listener, message);
            } catch (Exception e) {
              handleException(message);
              throw e;
            }
          }

          private long getDelay(int retryCount) {
            if (retryCount <= 1) {
              return 20;
            }
            return (long) (20 * Math.pow(2, retryCount));
          }

          private void handleException(Message msg) throws JMSException {
            if (msg.propertyExists("JMSXDeliveryCount")) {
              int retryCount = msg.getIntProperty("JMSXDeliveryCount");
              deactivatedTill = System.currentTimeMillis() + getDelay(retryCount);
            }
          }

          @Override
          protected void doInvokeListener(SessionAwareMessageListener listener, Session session,
              Message message)
              throws JMSException {
            try {
              super.doInvokeListener(listener, session, message);
            } catch (Exception e) {
              handleException(message);
              throw e;
            }
          }
        };
      }
    };
    // This provides all boot's default to this factory, including the message converter
    configurer.configure(factory, connectionFactory);
    // You could still override some of Boot's default if necessary.
    return factory;
  }
}
1 голос
/ 17 августа 2020

Классы IBM MQ для JMS имеют встроенную обработку подозрительных сообщений. Эта обработка основана на настройке QLOCAL BOTHRESH, что означает Backout Threshold. Каждое сообщение IBM MQ имеет «заголовок», называемый MQMD (дескриптор сообщения MQ). Одно из полей в MQMD - BackoutCount. Значение BackoutCount для нового сообщения по умолчанию - 0. Каждый раз, когда сообщение откатывается в очередь, это счетчик увеличивается на 1. Откат может происходить либо от указанного c вызова до rollback(), либо из-за того, что приложение было отключено от MQ до вызова commit () (из-за из-за сетевой проблемы, например, или сбоя приложения).

Обработка ядовитых сообщений отключена, если вы установили BOTHRESH(0).

Если BOTHRESH> = 1, то подозрительное сообщение обработка включена, и когда классы IBM MQ для JMS считывают сообщение из очереди, они проверяют, соответствует ли BackoutCount> = ОБА SH. Если сообщение подходит для обработки подозрительного сообщения, оно будет перемещено в очередь, указанную в атрибуте BOQNAME, если этот атрибут пуст или приложение по какой-то причине не имеет доступа к PUT в эту очередь, оно вместо этого попытается для помещения сообщения в очередь, указанную в атрибуте DEADQ диспетчеров очередей, если оно не может быть помещено ни в одно из этих мест, оно будет возвращено в очередь.

Более подробную информацию о классах IBM MQ для обработки подозрительных сообщений JMS можно найти на странице Центра знаний IBM MQ v9.1. Разработка приложений> Разработка приложений JMS и Java> Использование классов IBM MQ для JMS> Написание Классы IBM MQ для приложений JMS> Обработка подозрительных сообщений в классах IBM MQ для JMS

...