Как обрабатывать сообщения MSMQ параллельно - PullRequest
16 голосов
/ 31 марта 2011

Я пишу службу Windows для приема сообщений MSMQ.Служба будет иметь периоды высокой активности (80 000 сообщений, поступающих очень быстро) и длительные периоды бездействия (может быть несколько дней без нового сообщения).

Обработка сообщений очень привязана к сети, поэтому я получаюбольшая выгода от параллелизма.Но в периоды бездействия я не хочу связывать кучу потоков, ожидающих сообщений, которые не приходят в ближайшее время.

Интерфейс MSMQ, похоже, очень ориентирован на синхронный рабочий процесс - получите одинсообщение, обработать его, получить другое и т. д. Как мне структурировать свой код, чтобы я мог использовать преимущества параллелизма в периоды высокой активности, но не связывать кучу потоков в периоды бездействия?Бонусные баллы за использование TPL.Псевдокод был бы оценен.

Ответы [ 7 ]

11 голосов
/ 01 апреля 2011

За прошедшие годы я выполнил множество MSMQ (включая мобильные реализации), и вы правы в характеристике «синхронного рабочего процесса».Дело не в том, что вы не можете принимать различные конверты сообщений и обрабатывать их в разных ядрах с помощью TPL ... ограничивающим фактором является чтение / запись в очередь ... по своей сути последовательная операция.Вы не можете отправить 8 сообщений одновременно (например, компьютер с 8 ядрами).

У меня была похожая потребность (без использования пространства имен System.Messaging), и я решил ее с помощью справки из книги, которую я прочитал«Параллельное программирование с Microsoft.NET» Кэмпбеллом и Джонсоном.

Ознакомьтесь с их главой «Параллельные задачи» и, в частности, с частью использования глобальной очереди, которая взаимодействует с локальными очередями для каждого потока для обработки работы (т. Е.TPL), которые используют алгоритм «кражи работы» для балансировки нагрузки.Я смоделировал свое решение, частично, по их примеру.Финальная версия моей системы имела огромную разницу в производительности (от 23 сообщений в секунду до более 200).

В зависимости от того, сколько времени ваша система перейдет от 0 до 80000, вам потребуетсявзять один и тот же дизайн и распределить его по нескольким серверам (каждый с несколькими процессорами и несколькими ядрами).Теоретически моей установке потребовалось бы чуть меньше 7 минут, чтобы отшлифовать все 80K, поэтому, добавив второй компьютер, можно было бы сократить это примерно до ~ 3 минут и 20 секунд, и т. Д., И т. Д., И т. Д.логика кражи работы.

Пища для размышлений…

Быстрое редактирование: Кстати, компьютер представляет собой рабочую станцию ​​Dell T7500 с двухъядерным процессором Xeons @ 3GHz, 24 ГБ ОЗУ, Windows 7 Ultimate 64-битное издание.

6 голосов
/ 08 апреля 2011

Вот упрощенная версия того, что я в итоге делал:

while(true) {
    int msgCount = 0;

    Parallel.ForEach(Enumerable.Range(0,20), (i) => {
        MessageQueue queue = new MessageQueue(_queuePath);

        try {
            msg = queue.Receive(TimeSpan.Zero);
            // do work

            Interlocked.Increment(ref msgCount);
        catch(MessageQueueException mqex) {
            if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) {
                return; // nothing in queue
            }
            else throw;
        }           
    }

    if (msgCount < 20) 
        Thread.Sleep(1000); // nothing more to do, take a break
}

Поэтому я стараюсь получать сообщения по 20 одновременно, считая те, которые я получаю.Для тех 20 я позволил TPL пойти в город.В конце, если я обработал менее 20 сообщений, очередь пуста, и я сплю поток на секунду, прежде чем пытаться снова.

3 голосов
/ 01 апреля 2011

NServiceBus имеет отличную концепцию для этой самой проблемы.Это называется Дистрибьютор .Идея состоит в том, что распространитель может переслать работу, которую необходимо выполнить, и распределить ее по любому количеству работающих дочерних узлов.В зависимости от типа выполняемой работы, например, тяжелых вычислений или записи на диск, вы можете распределить это по нескольким процессам или даже по нескольким машинам.

2 голосов
/ 29 апреля 2013

Вот как я это делаю (изменяя код на лету, чтобы не было орфографических ошибок):

for (int i = 0; i < numberOfSimultaneousRequests; i++)
            priorityQueue.BeginReceive(TimeSpan.FromDays(30), state, callback);

и обратный вызов выглядит примерно так:

private void ProcessMessage(IAsyncResult asyncResult)
    {
        try
        {
            Message msg = priorityQueue.EndReceive(asyncResult);
            //Do something with the message
        }
        finally
        {
            priorityQueue.BeginDequeue(null, ProcessMessage);//start processing another one
        }
2 голосов
/ 30 ноября 2012

Решение также частично зависит от того, как обрабатываются сообщения.

Я использовал WorkflowService, размещенный в Windows Server AppFabric с привязкой Net.Msmq и транзакционной очередью.Привязка транзакции net.msmq была необходима для обработки не по порядку обработки сообщений.Рабочий процесс - конечный автомат .Net 4.0.1, и сообщения поступают в одну и ту же очередь из разных систем.Например, возможно, чтобы одна система отправила обновление экземпляру конечного автомата до того, как другая система отправит сообщение для его создания.Чтобы включить обработку сообщений не по порядку, узел службы рабочего процесса использует BufferedReceive для блокировки сообщений и многократно повторяет попытку получения их из под блокировки очереди.В BufferedReceive максимальное число ожидающих сообщений установлено равным максимально вероятной длине пакета, поскольку сообщения в заблокированной очереди возвращаются в очередь повторов в передней части .

WF также имеет ряд настроек регулирования.Максимально допустимая длина пакета составляет около 20000. Я установил MaxConcurrentCalls на 64, а MaxConcurrentInstances на 20000. В результате IIS / WAS обрабатывает 64 одновременных вызова.

Однако, и это именно так, потому чтоПолучения в рабочем процессе являются односторонними, это не означает, что порожденные процессы WF завершаются, как только завершается получение.Далее в моем сценарии происходит следующее: после удаления сообщения и вызова экземпляра WF, который является одним из 64 вызовов, механизм рабочих процессов планирует ряд следующих шагов, один из которых - операция базы данных.

Гвоздь в том, что 64 вызова могут быть максимальными, но если скорость утечки сообщений выше, чем скорость завершения асинхронного процесса, так как при обработке входящей партии сообщений будет большее число исполняющих потоков (вмой случай WF экземпляры).Это может привести к непредвиденным ситуациям, например, для пула соединений ADO.NET значение по умолчанию равно 100 как максимальное количество соединений.Это приведет к истечению времени ожидания процессов для соединений из исчерпанного пула.Для этой конкретной проблемы вы можете либо повысить значение MaxPoolSize, либо использовать компонент Service Broker для асинхронной обработки операций с БД (что означает большую сложность в рабочем процессе).

Надеюсь, это кому-нибудь поможет.

1 голос
/ 06 августа 2015

Я нашел полное решение в блоге под названием CodeRonin.Мне кажется, что это единственный полный пример во всем Интернете.Спасибо, CodeRonin!

http://code -ronin.blogspot.de / 2008/09 / msmq-транзакция-message-processing.html

1 голос
/ 20 июля 2011

Просто пытаясь сделать что-то похожее, tpl, похоже, может генерировать какое-то исключение безопасности потока, когда он сталкивается с физическими проблемами, например, попытаться создать sqlconnection за пределами forech tpl и использовать его в теле цикла - он выдал исключение для меня. Я создаю очередь перед входом в тело, перечисляя по списку строк, и кажется, что все в порядке, мой код обрабатывал 10000 элементов постоянно в течение 500 мс, используя обмен сообщениями 1way на i7 2500 8 ГБ и локальный msmq

...