Можно ли потерять сообщения, используя MSMQ MessageQueue.Peek с таймаутом? - PullRequest
5 голосов
/ 14 декабря 2009

У меня сейчас проблема с потерей сообщения. Эта ошибка возникает редко, но случается достаточно часто, чтобы раздражать. Вот контекст вопроса:

  • Я включил журнал сообщений на goldmine_service_queue, MSMQ на сервере Windows 2003.
  • Я могу доказать, что сообщение вставляется в goldmine_service_queue, так как сообщение появляется в журнале сообщений. Эта информация содержит информацию о времени исчезновения сообщения.
  • Функции регистрации используют http://logging.apache.org/log4net/index.html
  • В журналах не отображаются ошибки.
  • Рабочая функция (показанная ниже) выполняется в потоке службы Windows. Он отвечает за просмотр сообщений (рабочих элементов) из очереди и их обработку.
  • Из журналов я сильно подозреваю, что моя проблема может быть связана с MessageQueue.Peek и поведением тайм-аута.

Возможно ли, что таймаут и получение сообщения происходят одновременно? Есть ли лучший способ обработать проверку остановки службы, чтобы избежать этой ошибки?

        private void workerFunction()
    {

        logger.Info("Connecting to queue: " + Settings.Default.goldmine_service_queue);
        MessageQueue q = new MessageQueue(Settings.Default.goldmine_service_queue);
        q.Formatter = new ActiveXMessageFormatter();

        while (serviceStarted)
        {

            Message currentMessage = null;

            try
            {
                currentMessage = q.Peek(new TimeSpan(0,0,30));
            }
            catch (System.Messaging.MessageQueueException mqEx)
            {
                if (mqEx.ToString().Contains("Timeout for the requested operation has expired"))
                {
                    logger.Info("Check for service stop request");
                }
                else
                {
                    logger.Error("Exception while peeking into MSMQ: " + mqEx.ToString());
                }
            }
            catch (Exception e)
            {
                logger.Error("Exception while peeking into MSMQ: " + e.ToString());
            }

            if (currentMessage != null)
            {

                logger.Info(currentMessage.Body.ToString());
                try
                {
                    ProcessMessage(currentMessage);
                }
                catch (Exception processMessageException)
                {
                    logger.Error("Error in process message: " + processMessageException.ToString());
                }

                //Remove message from queue.
                logger.Info("Message removed from queue.");
                q.Receive();
                //logPerformance(ref transCount, ref startTime);
            }


        }//end while

        Thread.CurrentThread.Abort();

    }

Ответы [ 3 ]

5 голосов
/ 14 декабря 2009

Я не думаю, что какие-либо сообщения должны быть пропущены из-за быстрого обзора, но вы работаете очень странным образом с большим количеством возможностей для условий гонки.

Почему бы просто не получить сообщение и не передать его ProcessMessage (если ProcessMessage не удастся, вы все равно выполняете чтение). Если вам нужно обработать несколько получателей, выполните прием в транзакции MSMQ, чтобы сообщение было недоступно для других получателей, но не удалялось из очереди, пока транзакция не будет зафиксирована.

Кроме того, вместо опроса очереди, почему бы не сделать асинхронный прием и позволить пулу потоков обрабатывать завершение (где вы должны вызвать EndReceive). Это избавляет от необходимости связывать поток, и вам не нужно отключать службу в особом случае (закройте очередь сообщений и затем вызовите MessageQueue.ClearConnectionCache();).

Кроме того, прерывание потока - это действительно плохой способ выхода, просто вернитесь из функции запуска потока.

3 голосов
/ 16 декабря 2009

Просто несколько комментариев, чтобы уточнить, как MSMQ работает здесь.

"Я могу доказать, что сообщение вставляется в goldmine_service_queue, так как сообщение появляется в журнале сообщений.

Сообщение попадает в очередь журнала, когда исходное сообщение удаляется из очереди goldmine_service_queue. Таким образом, вы можете сказать, что сообщение было успешно доставлено в очередь и успешно удалено из очереди.

«Я сильно подозреваю, что моя проблема может быть связана с MessageQueue.Peek и поведением тайм-аута».

Peek ничего не делает для удаления сообщения из очереди. Только "q.Receive ();" делает это В вашем коде нет явной связи между просматриваемым сообщением и полученным. "Q.Receive ();" просто говорит "получить сообщение из верхней части очереди". В многопоточной среде можно ожидать непоследовательного чтения сообщений - некоторые могут быть просмотрены и обработаны несколько раз. Вы должны получить идентификатор сообщения Peeked и использовать ReceiveByID, чтобы вы могли только получить сообщение Peeked.

1 голос
/ 14 декабря 2009

Я хочу быть уверен, что изменение currentMessage = q.Peek(new TimeSpan(0,0,30)); на currentMessage = q.Receive(); решит вашу проблему. Я использую MSMQ для передачи сообщений в том же контексте, но использую только peek (и ожидаю исключение тайм-аута), чтобы определить, пуста ли их очередь. Вызов Receive блокируется, поэтому планируйте соответственно.

Редактировать: также для перехвата исключений - вы можете проверить, является ли ваше исключение тайм-аутом, сравнив код ошибки.

mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout

где mqe - исключение из очереди сообщений. Вам может понравиться это лучше, чем сравнение строк.

...