Правильное использование очереди сообщений на 2008R2 - PullRequest
4 голосов
/ 24 марта 2011

Я не программист, но я пытаюсь помочь им, давая им некоторое руководство.У нас больше нет собственных знаний по MSMQ.Мы пытаемся использовать это для интеграции некоторых функций с приложением планирования.

Приложение планирования запускает работу, выполняя веб-вызов с использованием специально созданной библиотеки DLL.DLL вызывает вебурл.Веб-приложение выполнит свою задачу и отправит на веб-сайт обновления о выполненной задаче.Сайт пишет сообщение в очередь.DLL, вызвавшая сайт, отслеживает очередь сообщений с меткой, назначенной этому заданию.Когда он получает сообщение об окончательном статусе, он закрывается.

Мы получаем следующее сообщение каждые несколько часов.Мы выполняем около 100 заданий в час, которые используют этот метод.В приведенном внизу коде jobid соответствует метке для сообщения в очереди сообщений.Каждому заданию выдается идентификатор задания в начале, и он будет использовать его в качестве метки для каждого сообщения, которое оно отправляет в msmq для этого задания.

 System.Messaging.MessageQueueException (0x80004005): Message that the cursor is currently pointing to has been removed from the queue by another process or by another call to Receive without the use of this cursor.
  at System.Messaging.MessageQueue.ReceiveCurrent(TimeSpan timeout, Int32 action, CursorHandle cursor, MessagePropertyFilter filter, MessageQueueTransaction internalTransaction, MessageQueueTransactionType transactionType)
  at System.Messaging.MessageEnumerator.get_Current() 

Вот код для него.

  while ( running )
        {
            // System.Console.WriteLine( "Begin Peek" );
            messageQueue.Peek();
            //System.Console.WriteLine( "End Peek" );
            messageQueue.MessageReadPropertyFilter.SetAll();

            using ( MessageEnumerator enumerator = messageQueue.GetMessageEnumerator2() )
            {
                enumerator.Reset();

                while ( enumerator.MoveNext() )
                {
                    Message msg = enumerator.Current;

                    if ( msg.Label.Equals( this.jobid ) )
                    {
                        StringBuilder sb = new StringBuilder();
                        /*
                        try
                        {
                            sb.Append( "Message Source: " );
                            //sb.Append( msg.SourceMachine );
                            sb.Append( " Sent: " );
                            sb.Append( msg.SentTime );
                            sb.Append( " Label " );
                            sb.Append( msg.Label );
                            sb.Append( " ID: " );
                            sb.Append( msg.Id );
                            sb.Append( " CorrelationID: " );
                            sb.Append( msg.CorrelationId );
                            sb.Append( " Body Type: " );
                            sb.Append( msg.BodyType );
                        }
                        catch ( Exception )
                        {
                            throw;
                        }
                        finally
                        {
                            System.Console.WriteLine( sb.ToString() );
                        }
                        */
                        //System.Console.WriteLine( "Receiving Message started" );
                        using ( Message message = messageQueue.ReceiveById( msg.Id ) )
                        {
                            //System.Console.WriteLine( "Receiving Message Complete" );
                            //sb = new StringBuilder();
                            string bodyText = string.Empty;

                            try
                            {
                                System.IO.StringWriter sw = new System.IO.StringWriter( sb );
                                System.IO.StreamReader sr = new System.IO.StreamReader( message.BodyStream );

                                while ( !sr.EndOfStream )
                                {
                                    sw.WriteLine( sr.ReadLine() );
                                }
                                sr.Close();
                                sw.Close();
                                bodyText = ( string ) FromXml( sb.ToString(), typeof( string ) );
                                int indx = bodyText.IndexOf( ',' );
                                string tokens = bodyText.Substring( indx + 1 );
                                indx = tokens.IndexOf( ',' );
                                string command = tokens.Substring( 0, indx );
                                tokens = tokens.Substring( indx + 1 );
                                if ( command.Equals( COMMAND_STARTED ) )
                                {
                                    System.Console.WriteLine( "STARTED " + tokens );
                                }
                                else if ( command.Equals( COMMAND_UPDATE ) )
                                {
                                    System.Console.WriteLine( tokens );
                                }
                                else if ( command.Equals( COMMAND_ENDED_OK ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Success" );
                                    finalResults = new FinalResults( 0, 0, "Success" );
                                    running = false;
                                }
                                else if ( command.Equals( COMMAND_ENDED_WARNING ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Warning Issued" );
                                    finalResults = new FinalResults( 1, 1, "Warning" );
                                    running = false;
                                }
                                else if ( command.Equals( COMMAND_ENDED_FAIL ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Failure" );
                                    finalResults = new FinalResults( 2, 16, "Failure" );
                                    running = false;
                                }
                            }
                            catch ( Exception )
                            {
                                throw;
                            }
                            finally
                            {
                                //System.Console.WriteLine( "Body: " + bodyText );
                            }
                        }
                    }
                }
            }
        }

        return finalResults;
    }

    MessageQueue messageQueue = null;
    string webServiceURL = "";
    Dictionary<string, string> parms = new Dictionary<string, string>();
    string jobid = "NONE";

Ответы [ 3 ]

3 голосов
/ 24 марта 2011

Объяснение kprobst, вероятно, происходит.Даже если вы видите, что это конкретное сообщение находится в очереди, если другое приложение (или другой экземпляр того же приложения) выбирает (любое) сообщение из этой очереди, это приведет к недействительности курсора.

По своей сути этокод не предназначен для работы, если несколько процессов подают из одной очереди.

3 голосов
/ 24 марта 2011

Обычно это означает, что сообщение, которое вы получаете (), удаляется чем-то другим, прежде чем операция приема может быть завершена. Другое приложение или другой поток в том же процессе, что и ваш код, с использованием другой ссылки на очередь.

Возможно ли, что у вас одновременно работают два экземпляра кода процессора (я думаю, это консольное приложение)? На одинаковых или разных машинах? Или какое-то другое приложение или инструмент, удаляющий сообщения из очереди?

Раньше в одной из предварительных версий .NET 2.0 существовала ошибка, которая вызывала это в некоторых стрессовых ситуациях, но, насколько я помню, она была исправлена ​​до их отправки.

2 голосов
/ 11 января 2017

Это сбой из-за проблемы параллелизма во внутреннем методе ReceiveCurrent из MessageQueue. Трассировка стека исключений показывает вызов, исходящий из enumerator.Current, а исключение произошло в ReceiveCurrent. Enumerator.Current вызывает ReceiveCurrent с опцией «peek». Вы можете спросить, что у меня было также, когда я столкнулся с той же самой проблемой, как может провал взгляда с ошибкой "Сообщение уже получено"? Он только пытается посмотреть следующее сообщение, которое еще не получено? Ответ на этот вопрос лежит в коде ReceiveCurrent, который доступен для просмотра здесь: https://referencesource.microsoft.com/#System.Messaging/System/Messaging/MessageQueue.cs,02c33cc512659fd7,references

ReceiveCurrent сначала выполняет вызов StaleSafeReceive для просмотра следующего сообщения. Но если этот вызов возвращает, ему нужно больше памяти для получения всего сообщения (строка с «while (MessageQueue.IsMemoryError (status)» в своем исходном коде) выделяет необходимую память и выполняет другой вызов StaleSafeReceive для получения сообщения. Это очень классический шаблон использования Win32 API, поскольку в конечном итоге он основан на C.

Проблема здесь в том, что если между первым и вторым вызовом StaleSafeReceive внутри ReceiveCurrent «получает» другой процесс или поток, т. Е. Удаляет это сообщение из очереди, второй вызов вызывает именно это исключение. И вот как операция «Peek» не удается. Обратите внимание, что это может быть любое сообщение, которое просматривает перечислитель, вызвавший исключение, а не сообщение, которое ищется. Это объясняет, почему сообщение с таким идентификатором задания все еще находится в очереди после того, как возникла исключительная ситуация и метод завершился неудачей.

Что можно сделать, это защитить enumerator. Текущий вызов с помощью try catch, и если это конкретное исключение перехватывается, просто продолжайте перечисление со следующим доступным сообщением в очереди.

Я использовал объект Cursor, а не перечислитель, но он сталкивается с той же проблемой. Но при использовании курсора существует другой способ уменьшить риск этого, то есть при сканировании / просмотре сообщения отключить все ненужные свойства объекта MessagePropertyFilter текущего объекта очереди, особенно свойство Body. Поскольку во время просмотра тела обычно не требуется получать тело, но чаще всего тело сообщения вызывает перераспределение памяти и требует второго вызова StaleSafeReceive внутри ReceiveCurrent. Тем не менее, попытка перехвата этого исключения потребовалась бы и при прямом использовании курсора с вызовами peek.

...