Чтение из MSMQ замедляется, когда в очереди много сообщений - PullRequest
8 голосов
/ 27 июня 2011

Краткое введение

У меня есть система на основе SEDA, и я использовал MSMQ для связи (запуска событий) между различными приложениями / службами.

Одна из этих служб получает сообщения по файлам, поэтому у меня есть прослушиватель файлов, который считывает содержимое файла и вставляет его в очередь (или фактически в 4 разных очереди, но это не очень важно для первого вопроса).

Сервер - Windows Server 2008

Первый вопрос - замедление чтения

Мое приложение, которое читает эти сообщения на другой стороне, обычно читает около 20 сообщенийиз очереди в секунду, но когда служба, отправляющая сообщения, начинает ставить в очередь несколько тысяч сообщений, чтение прекращается, и приложение чтения читает только 2-4 сообщения в секунду.Когда нет сообщений в очереди, приложение чтения может снова читать до 20 сообщений в секунду.

Код в приложении для чтения довольно прост, разработан на C #, я использую функцию Read (TimeSpan timeout) в System.Messaging.

Q: Почему чтение замедляется, когдамного сообщений отправлено в очередь?

Второй вопрос - ограничения TPS

Дополнительный вопрос касается самого чтения.Кажется, нет разницы в том, сколько сообщений я могу прочитать в секунду, если я использую 1 или 5 потоков для чтения из очереди.Я также пытался реализовать «циклическое решение», при котором почтовый сервис отправляет в произвольный набор из 4 очередей, а приложение чтения имеет один поток, прослушивающий каждую из этих очередей, но все равно остается только 20 TPS, даже если ячтение из 1 очереди с 1 потоком, 1 очереди с 4 потоками или 4 очередями (с одним потоком на очередь).

Я знаю, что обработка в потоке занимает около 50 мс, поэтому 20 TPS вполне корректно, если за один раз обрабатывается только одно сообщение, но подсказка с многопоточностью должна заключаться в том, что сообщения обрабатываются параллельно ине последовательный.

На сервере около 110 различных очередей.

В: Почему я не могу получить более 20 сообщений из своей очереди одновременно, даже с многопоточностью ииспользование нескольких очередей?

Этот код работает сегодня:

// There are 4 BackgroundWorkers running this function
void bw_DoWork(object sender, DoWorkEventArgs e) 
{
    using(var mq = new MessageQueue(".\\content"))
    {
        mq.Formatter = new BinaryMessageFormatter();

        // ShouldIRun is a bool set to false by OnStop()
        while(ShouldIRun)
        {
            try
            {
                using(var msg = mq.Receive(new TimeSpan(0,0,2))
                {
                    ProcessMessageBody(msg.Body); // This takes 50 ms to complete
                }
            }
            catch(MessageQueueException mqe)
            {
               // This occurs every time TimeSpan in Receive() is reached
               if(mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) 
                   continue;
            }
        }
    }

Но даже если есть 4 потока, кажется, что все ждут, пока функция снова войдет в точку «Прием».,Я также попытался использовать 4 разные очереди (content1, content2, content3 и content4), но все равно я получаю 1 сообщение, обрабатываемое каждые 50 мс.

Имеет ли это какое-либо отношение к TimeSpan в Receive (), и / или возможно ли это пропустить?

Другой вопрос, если использование частных очередей вместо публичной волирешить что-нибудь?

Ответы [ 2 ]

4 голосов
/ 27 июня 2011

Проблемы с производительностью.
Вы не упоминаете, работает ли весь код на сервере или если клиенты имеют удаленный доступ к очередям на сервере.Исходя из скорости, я буду считать последнее.
Кроме того, являются ли очереди транзакционными?
Насколько велики сообщения?

Если вы хотите прочитать сообщение из очереди, ваше приложение делаетне подключаться к самой очереди.Все идет между локальным администратором очередей и удаленным администратором очередей.Администратор очередей является единственным процессом, который пишет и читает из очередей.Поэтому наличие нескольких очередей или одной очереди не обязательно будет работать по-другому.

Таким образом, администратор очередей MSMQ может стать узким местом в какой-то момент, поскольку в то же время может быть выполнено только так много работы,Ваш первый вопрос показывает это - когда вы сильно загружаете менеджер очередей, помещая сообщения IN, ваша способность принимать сообщения OUT замедляется.Я бы порекомендовал посмотреть на монитор производительности, чтобы убедиться, что MQSVC.EXE максимально загружен, например.

2 голосов
/ 27 июня 2011

Почему вы используете временной интервал?- это плохо, и вот почему.

При разработке сервисов и очередей вам нужно программировать безопасным способом.Каждый элемент в очереди порождает новый поток.Использование timepan заставляет каждый из потоков использовать один поток событий таймера.Этим событиям приходится ждать своей очереди в потоке событий.

Норма - 1 поток на события в очереди - обычно это ваше событие System.Messaging.ReceiveCompletedEventArgs.Другой поток - это ваше событие onStart ...

20 потоков или 20 операций чтения в секунду, вероятно, правильно.Обычно при объединении потоков в .net можно создавать только 36 потоков одновременно.

Мой совет - отбросить событие таймера и заставить вашу очередь просто обработать данные.

сделать что-то еще подобное;

namespace MessageService 
{ 

public partial class MessageService : ServiceBase 

{ 

    public MessageService() 

    { 

        InitializeComponent(); 

    } 



    private string MessageDirectory = ConfigurationManager.AppSettings["MessageDirectory"]; 

    private string MessageQueue = ConfigurationManager.AppSettings["MessageQueue"]; 



    private System.Messaging.MessageQueue messageQueue = null; 



    private ManualResetEvent manualResetEvent = new ManualResetEvent(true); 





    protected override void OnStart(string[] args) 

    { 

        // Create directories if needed 

        if (!System.IO.Directory.Exists(MessageDirectory)) 

            System.IO.Directory.CreateDirectory(MessageDirectory); 



        // Create new message queue instance 

        messageQueue = new System.Messaging.MessageQueue(MessageQueue); 



        try 

        {    

            // Set formatter to allow ASCII text 

            messageQueue.Formatter = new System.Messaging.ActiveXMessageFormatter(); 

            // Assign event handler when message is received 

            messageQueue.ReceiveCompleted += 

                new System.Messaging.ReceiveCompletedEventHandler(messageQueue_ReceiveCompleted); 

            // Start listening 



            messageQueue.BeginReceive(); 

        } 

        catch (Exception e) 

        { 



        } 

    } 



    protected override void OnStop() 

    { 

        //Make process synchronous before closing the queue 

        manualResetEvent.WaitOne(); 





        // Clean up 

        if (this.messageQueue != null) 

        { 

            this.messageQueue.Close(); 

            this.messageQueue = null; 

        } 

    } 



    public void messageQueue_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e) 

    { 

        manualResetEvent.Reset(); 

        System.Messaging.Message completeMessage = null; 

        System.IO.FileStream fileStream = null; 

        System.IO.StreamWriter streamWriter = null; 

        string fileName = null; 

        byte[] bytes = new byte[2500000]; 

        string xmlstr = string.Empty;                

            try 

            { 

                // Receive the message 

                completeMessage = this.messageQueue.EndReceive(e.AsyncResult);                    

                completeMessage.BodyStream.Read(bytes, 0, bytes.Length); 



                System.Text.ASCIIEncoding ascii = new System.Text.ASCIIEncoding(); 



                long len = completeMessage.BodyStream.Length; 

                int intlen = Convert.ToInt32(len);                   

                xmlstr = ascii.GetString(bytes, 0, intlen);                   

            } 

            catch (Exception ex0) 

            { 

                //Error converting message to string                    

            } 

        }
...