Как подписаться на очередь MSMQ, но только «посмотреть» сообщение в .Net? - PullRequest
2 голосов
/ 03 мая 2010

У нас есть настройка MSMQ Queue, которая получает сообщения и обрабатывается приложением. Мы бы хотели, чтобы другой процесс подписался на очередь, просто прочитал сообщение и зарегистрировал его содержимое.

У меня это уже есть, проблема в том, что он постоянно заглядывает в очередь. Процессор на сервере, когда он работает, составляет около 40%. Mqsvc.exe работает на 30%, а это приложение работает на 10%. Я предпочел бы иметь что-то, что просто ожидает прихода сообщения, получает уведомление об этом, а затем регистрирует его без постоянного опроса сервера.

    Dim lastid As String
    Dim objQueue As MessageQueue
    Dim strQueueName As String

    Public Sub Main()
        objQueue = New MessageQueue(strQueueName, QueueAccessMode.SendAndReceive)
        Dim propertyFilter As New MessagePropertyFilter
        propertyFilter.ArrivedTime = True
        propertyFilter.Body = True
        propertyFilter.Id = True
        propertyFilter.LookupId = True
        objQueue.MessageReadPropertyFilter = propertyFilter
        objQueue.Formatter = New ActiveXMessageFormatter
        AddHandler objQueue.PeekCompleted, AddressOf MessageFound

        objQueue.BeginPeek()
    end main

    Public Sub MessageFound(ByVal s As Object, ByVal args As PeekCompletedEventArgs)

        Dim oQueue As MessageQueue
        Dim oMessage As Message

        ' Retrieve the queue from which the message originated
        oQueue = CType(s, MessageQueue)

            oMessage = oQueue.EndPeek(args.AsyncResult)
            If oMessage.LookupId <> lastid Then
                ' Process the message here
                lastid = oMessage.LookupId
                ' let's write it out
                log.write(oMessage)
            End If

        objQueue.BeginPeek()
    End Sub

Ответы [ 5 ]

4 голосов
/ 13 мая 2010

Вы пытались использовать MSMQEvent.Arrived для отслеживания сообщений?

Событие Arrived объекта MSMQEvent наступает, когда вызывается метод MSMQQueue.EnableNotification экземпляра объекта MSMQQueue, представляющего открытую очередь, и сообщение найдено или прибывает в соответствующую позицию в очереди.

3 голосов
/ 03 мая 2010

Thread.Sleep (10) между итерациями просмотра может сэкономить вам кучу циклов.

Единственный другой вариант, о котором я могу подумать, - встроить запись в приложение чтения очереди.

1 голос
/ 03 мая 2010

Нет API, который позволял бы вам просматривать каждое сообщение только один раз.

Проблема в том, что BeginPeek немедленно выполняет обратный вызов, если в очереди уже есть сообщение. Поскольку вы не удаляете сообщение (в конце концов, это peek , не принимается!), Когда ваш обратный вызов начинает снова заглядывать, процесс начинается заново, поэтому MessageFound выполняется почти постоянно.

Ваши лучшие варианты - записывать сообщения в писателе или читателе. Журналирование будет работать в течение коротких периодов времени (если вы заботитесь только о полученных сообщениях), но не является долгосрочным решением:

В то время как производительность снижается извлечение сообщений из очереди, которая настроен только на ведение журнала примерно на 20% больше, чем получение сообщения без журналирования, настоящие стоимость неожиданных проблем, вызванных когда работает непроверенная служба MSMQ недостаточно памяти или машина вышла из строя дисковое пространство

0 голосов
/ 04 сентября 2017

ИМХО нужно просто включить ведение журнала в очереди. Затем вы гарантированно сохраняете копию всех сообщений, которые были переданы в очередь, и это не относится к вашей трудоемкой попытке создать собственный механизм для регистрации всего этого.

Намного проще и надежнее регистрировать и удалять зарегистрированные сообщения по расписанию, если вы хотите что-то более удобочитаемое, чем сама очередь (и я, конечно, хотел бы этого). Тогда не имеет значения, насколько быстро работает процесс или нет, вам нужно получать сообщения только один раз, и в целом это гораздо лучший способ решения проблемы.

0 голосов
/ 04 мая 2010

Это работает для меня. Он блокирует поток во время ожидания сообщения. Каждый цикл цикла проверяет член класса _bServiceRunning, чтобы увидеть, должен ли поток прерваться.

    private void ProcessMessageQueue(MessageQueue taskQueue)
    {
        // Set the formatter to indicate body contains a binary message:
        taskQueue.Formatter = new BinaryMessageFormatter();

        // Specify to retrieve selected properties.
        MessagePropertyFilter myFilter = new MessagePropertyFilter();
        myFilter.SetAll();
        taskQueue.MessageReadPropertyFilter = myFilter;

        TimeSpan tsQueueReceiveTimeout = new TimeSpan(0, 0, 10); // 10 seconds

        // Monitor the MSMQ until the service is stopped:
        while (_bServiceRunning)
        {
            rxMessage = null;

            // Listen to the queue for the configured duration:
            try
            {
                // See if a message is available, and if so remove if from the queue if any required
                // web service is available:
                taskQueue.Peek(tsQueueReceiveTimeout);

                // If an IOTimeout was not thrown, there is a message in the queue
                // Get all the messages; this does not remove any messages
                Message[] arrMessages = taskQueue.GetAllMessages();

                // TODO: process the message objects here;
                //       they are copies of the messages in the queue
                //       Note that subsequent calls will return the same messages if they are
                //       still on the queue, so use some structure defined in an outer block
                //       to identify messages already processed.

            }
            catch (MessageQueueException mqe)
            {
                if (mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
                {
                    // The peek message time-out has expired; there are no messages waiting in the queue
                    continue; // at "while (_bServiceRunning)"
                }
                else
                {
                    ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, mqe);
                    break; // from "while (_bServiceRunning)"
                }
            }
            catch (Exception ex)
            {
                ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, ex);
                break; // from "while (_bServiceRunning)"
            }
        }

    } // ProcessMessageQueue()
...