IBM MQ - чтение сообщений и вызов транзакционных сервисов - PullRequest
0 голосов
/ 30 мая 2019

Я работаю над потребителем для сообщений IBM MQ, я хочу сделать каждую обработку сообщений транзакционной после прочтения с подтверждением.

Когда я читаю каждое сообщение из IBM MQ, мне нужно вызвать от 4 до 5 различных служб отдыха. Будет около 1500 - 2000 вставок в разных таблицах через сервисы.

Если по какой-либо причине происходит сбой любого из сервисов, я хочу откатить предыдущие вставки, произошедшие при обработке этого конкретного сообщения, и оставить сообщение в очереди.

Как я могу добиться того же? Я совершенно новичок в IBM MQs / Jms

Я планирую сделать это, просматривая сообщения, используя

QueueBrowser queueBrowser = context.createBrowser(queue, "JMSCorrelationID='ID:c9d5e2d7c5c3e3c9d6d54040404040404040404040404040c9d5e2d7c5c3e3c9d6d54040404040404040404040404040'");

1 Ответ

1 голос
/ 31 мая 2019

Если все эти очереди находятся в одном MQ Queue Manager, вы должны использовать для этого «локальные транзакции JMS». Поэтому создайте соединение IBM MQ JMS и сделайте сеанс транзакцией (первый аргумент установлен на true ):

session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

Примечание. Второй аргумент игнорируется, если первый аргумент имеет значение true.

Создайте свои очереди JMS и получателя, а затем прочитайте ваше первое сообщение, например:

Message msg = msgConsumer.receive(100);

Это неявно запускает MQ-транзакцию для первого полученного сообщения, если нет текущей транзакции.

Затем выполните обработку и, если все прошло хорошо, вызовите commit.

Если нет, откатите транзакцию, и вы снова увидите все откатанные сообщения. Так что это может сработать примерно так:

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageConsumer msgConsumer = session.createConsumer(destination, null);
while( !isStopped() ) {

  try {

    Message msg = msgConsumer.receive(100);
    if( msg!=null ) { 
      ... call your REST services ...
      session.commit();
    }
    ... test for end condition ...
  }
  catch (Exception e) {
    ... error handling ...
    session.rollback();
  }
}
...