IBM MQQueue Самый безопасный способ получить все сообщения - PullRequest
1 голос
/ 04 февраля 2020

У меня есть простая java программа для передачи сообщений из очереди A в очередь B с использованием IBM MQ.

Моя программа работает нормально, но меня беспокоит потеря сообщений. Я знаю, что .get() удаляет сообщение из очереди A. Поэтому, конечно, есть короткий момент, когда я «получил» сообщение из очереди A, и я еще не поместил его в очередь B. Если бы моя программа cra sh в течение этого времени - сообщение будет потеряно.

Для борьбы с этим я записываю текущее сообщение в журналы. Затем, в случае сбоя программы, мы можем вручную ввести сообщение в очередь.

Однако - что если программа вылетит из IOException? Теперь сообщение ушло из очереди A, не было .put() в очереди B и не было записано в журналы.

На мой взгляд, у меня есть два варианта:

Сначала просмотрите сообщение: Я знаю, что могу просмотреть сообщение перед тем, как его "получить", хотя меня немного смущает, как это влияет на количество сообщений в очереди и создает ли оно дубликат и т. д. c.

Запишите сообщение обратно в очередь A: Теоретически, если мы «получим» сообщение из очереди A, у нас не должно возникнуть проблем с помещением «обратно в очередь A, если по какой-то причине мы не можем подключиться к очереди B.

Может ли кто-нибудь сначала уточнить правильный способ просмотра сообщения - или, возможно, предложить третий вариант, о котором я не думал ?

while (true) {

  try {

    // Clear the MQMessage
    theMessage.messageId = MQConstants.MQMI_NONE;
    theMessage.correlationId = MQConstants.MQCI_NONE;

    // Get the message from queue A
    queueA.get(theMessage, gmo);

    // Read the message from queue A
    byte[] messageBytes = new byte[theMessage.getMessageLength()];
    theMessage.readFully(messageBytes);
    String messageText = new String(messageBytes);

    // Store the message to the logs in case of crash

    // Put the message in queue B
    queueB.put(theMessage);

  } catch (MQException e) {

    // Break the loop if we get an MQException
    // Hopefully, it is a reason code 2033 (out of messages)

  } catch (IOException e) {

    // Something went wrong reading the message

  }
}

1 Ответ

4 голосов
/ 04 февраля 2020

Как правило, если вы хотите отслеживать чтение и запись сообщений, вы должны использовать транзакционное чтение / запись.

MQGetMessageOptions gmo = new MQGetMessageOptions();   
gmo.waitInterval = 1000;
gmo.options = MQGMO_WAIT;
gmo.options += MQGMO_FAIL_IF_QUIESCING;
gmo.options += MQGMO_SYNCPOINT;

MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options += MQPMO_SYNCPOINT;

// create message instance
MQMessage message = new MQMessage();
message.correlationId = MQCI_NONE;
message.messageId = MQMI_NONE;

// read message
queueA.get(message, gmo);

// write message
queueB.put(message, pmo);

// commit transaction
qmgr.commit();

В этом случае, если транзакция не будет зафиксирована, все прочитанные сообщения вернутся в исходную очередь, и все записанные сообщения исчезнут из целевых очередей. Возможно, будет хорошей идеей фиксировать не каждое сообщение, а каждые 10 или 100 в зависимости от их количества.

Если вы не собираетесь использовать распределенные транзакции (например, сохранение некоторой информации из сообщений MQ в базе данных), то было бы достаточно. В противном случае я бы рекомендовал перейти на JMS из-за лучшей поддержки транзакций.

...