Почему чтение слишком большого количества сообщений из MSMQ параллельно происходит так медленно? - PullRequest
0 голосов
/ 06 июля 2019

У меня есть приложение, которое пишет 1500 сообщений в секунду.

В другом сервисе я читаю эти сообщения параллельно и обрабатываю. Но чтение из очереди идет медленно, и я думаю, что узким местом является queue.Receive(TimeSpan.Zero).

Не знаю, почему чтение идет медленно?

Мой сервис работает на сервере с хорошими вычислительными возможностями.

Это мой код.

static Task Main()
        {
      GetFromQueueAsync();
    }


 private static Task GetFromQueueAsync()
        {
            string queueName = ConfigurationManager.AppSettings["QueueName"].ToString();

            while (true)
            {
                var blockArray = Enumerable.Range(0, 30).ToArray();

                Parallel.ForEach(blockArray, (i) =>
                {
                    MessageQueue queue = new MessageQueue(queueName);

                    try
                    {                 
                        var message = queue.Receive(TimeSpan.Zero);

                        message.Formatter = new BinaryMessageFormatter();
                        var labelParts = message.Label.Split('_');

                        var isValidMessageAddress=  Validate(labelParts);

                        if (isValidMessageAddress)
                        {
                //call my sysnc method
                        }

            }
                    catch (MessageQueueException mqex)
                    {
                        if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
                        {
                            return;
                        }
                        else throw;
                    }
                });
            }

1 Ответ

1 голос
/ 06 июля 2019

В: Я не знаю, почему чтение идет медленно?

Давайте сначала договоримся о том, что у каждой реализации процесса есть некоторый предел производительности, который не может быть нарушен даже при использовании бесконечно большей мощности.

Далее, отметим, что концепция любого Queue по своей природе SERIAL, а не CONCURRENT (может быть добавлен параллелизм к чистому SERIAL извлечению сообщения «один за другим, после другого», но за дополнительную плату, оба с точки зрения производительности (пропускная способность, очевидно, уменьшается, чтобы иметь возможность получать + сигнализирует + обрабатывает и контролирует множество (в настоящее время) CONCURRENT -читанных запросов с внутренним по-прежнему чистым заголовком очереди SERIAL для безопасного процесса и доставки очереди и с задержкой (задержка, очевидно, увеличивается, часто во много раз на несколько порядков выше, по сравнению с оригинальной, несложной, чистой - SERIAL, исключительной механикой доступа к сообщениям монополиста с малой задержкой).

С учетом вышесказанного, истинное PARALLEL планирование процессов («все читают все сообщения в один и тот же момент» (все обслуживается за один раз, синхронно, можно добавить)) просто никогда не происходит один экземпляр экземпляра очереди, никогда.

Решение:

Для более быстрой пропускной способности лучший способ состоит в том, чтобы поддерживать чистую SERIAL, низко-латентную обработку головного узла в экземпляре Queue и отправлять каждое сообщение в какой-либо другой рабочий поток для обработки выгруженного сообщения, независимо от других сообщений, все еще ожидающих внутри (да, все еще и всегда будет) pure- SERIAL очереди сообщений.

Существует много способов отправки уже выгруженного содержимого сообщения для дальнейшей обработки, и такой выбор зависит от вашей архитектуры и проектных решений. Оба inproc:// или ipc:// (без стека протоколов) средства межпотоковой или межпроцессорной связи хорошо оснащены для использования пулов из многих сотен потоков обработки или достаточно (совместно размещенные или даже широко распространяемые) процессы обработки содержимого сообщений, если вашему приложению требуется увеличение и увеличение производительности, но при этом сохраняется минимально возможная задержка.

Остерегайтесь всех дополнительных расходов
, все еще оставшихся внутри while(true){...} -инфинитных циклов,
любые такие, тем более повторяющиеся, вырождают Закон Амдала Вычисляем PARALLEL-код Speedup :

Код «как есть» (как сообщает @ TheodorZoulias ) повторяет и повторяет все экземпляры - на самом деле нет оснований для повторного создания, выбрасывания и повторного создания всех 30-предложенных глав очереди -концы за каждый цикл цикла. Кажется, это худшая последовательность шагов и самое плохое управление ресурсами из когда-либо возможных:

        while (true) //--------------------------------- INFINITE LOOP
        {            //----- NEW ARRAY                CREATED PER-LOOP
            var blockArray = Enumerable.Range(0, 30).ToArray();
                     //----- NEW .ForEach(){...}      CREATED PER-LOOP                
            // |||||||||||||||||||||||||||||||||||||||||||||||||||||||PAR||| BLOCK
            Parallel.ForEach(blockArray, (i) =>    // ||||||||||||||||PAR||| BLOCK
            {        //----- NEW QUEUE                CREATED PER-LOOP x PER-BLOCK
                MessageQueue queue = new MessageQueue(queueName);
                try
                {   var message = queue.Receive(TimeSpan.Zero);
                    message.Formatter = new BinaryMessageFormatter();
                    var labelParts = message.Label.Split('_');
                    var isValidMessageAddress=  Validate(labelParts);
                    if (isValidMessageAddress)
                    {
                     //--------------------- PER MESSAGE PAYLOAD PROCESSING START
                     // call my sysnc method
                     //--------------------- PER MESSAGE PAYLOAD PROCESSING END
                    }
                }
                catch (MessageQueueException mqex)
                {   if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
                    {   return; //-- <<<< ???
                      //------------ EACH IOTimeout KILLS +ONE POOL-MEMBER
                      //------------ 30th IOTimeout LEAVES THE POOL EMPTY
                    }
                    else throw;
                    //-------------- NO ERROR HANDLING
                }
            });
            // |||||||||||||||||||||||||||||||||||||||||||||||||||||||PAR||| BLOCK
        } //-------------------------------------------- INFINITE-LOOP

Может быть довольно переформулировано (плюс повторное использование / предварительное распределение переменных может помочь еще больше):

         // |||||||||||||||||||||||||||||||||||||||||||||||||||||||||:PAR||| BLOCK
            Parallel.ForEach(blockArray, (i) =>    // |||||||||||||||:PAR||| BLOCK
            {        //--- A QUEUE HEAD-END           CREATED ONLY ONCE! PER-BLOCK
                MessageQueue queue = new MessageQueue(queueName);  //:PAR||| BLOCK
                while (true) //----PER-AGENT's [SERIAL]READING LOOP//:PAR||| BLOCK
                {                                                  //:PAR||| BLOCK
                    try      //----PER-AGENT's TRY{}               //:PAR||| BLOCK
                    {   var            message = queue.Receive( TimeSpan.Zero );
                                       message.Formatter = new BinaryMessageFormatter();
                        if ( Validate( message.Label.Split( '_' )))//:PAR||| BLOCK
                        {  //------PER-AGENT's---- PER MESSAGE     //:PAR||| BLOCK
                           // call my sysnc method                 //:PAR||| BLOCK
                           //------PER-AGENT's---- PER MESSAGE     //:PAR||| BLOCK
                        }                                          //:PAR||| BLOCK
                    }                                              //:PAR||| BLOCK
                    catch ...                                      //:PAR||| BLOCK
                    //-------------PER-AGENT's PER EXCEPTION HANDLING:PAR||| BLOCK
                    //                         WITHOUT ANY POOL AGENT:PAR||| BLOCK
                    //                         CANNIBALISATION       :PAR||| BLOCK
                } // --------------PER-AGENT's [SERIAL]READING LOOP  :PAR||| BLOCK
            } //|||||||||||||||||||||||||||||||||||||||||||||||||||||:PAR||| BLOCK
         // |||||||||||||||||||||||||||||||||||||||||||||||||||||||||:PAR||| BLOCK

Пусть больше потоков для перемещения большего количества сообщений - масштабирование производительности

Как только концепция дизайна кода становится ясной и обоснованной, и если проблема связана с управлением на уровне очереди, высокопроизводительные структуры обмена сообщениями позволяют увеличить количество выделенных для очереди потоков ввода-вывода. Конверты достижимой производительности намного выше нескольких k[msg/s] до нескольких ~ 10.000 ~ 100.000 ~ 1.000.000 сообщений в секунду , поэтому наличие нескольких тысяч сообщений в очереди в секунду для снятия с очереди определенно не проблема, если правильное проектирование введено в действие (предполагая, что Очередь не перемещает некоторые действительно очень большие BLOB-ы, где дополнительно должны быть сделаны некоторые трюки Zero-Copy / перемещение указателя, чтобы поддерживать темп)

...