В: Я не знаю, почему чтение идет медленно?
Давайте сначала договоримся о том, что у каждой реализации процесса есть некоторый предел производительности, который не может быть нарушен даже при использовании бесконечно большей мощности.
Далее, отметим, что концепция любого Queue
по своей природе SERIAL
, а не CONCURRENT
(может быть добавлен параллелизм к чистому SERIAL
извлечению сообщения «один за другим, после другого», но за дополнительную плату, оба с точки зрения производительности (пропускная способность, очевидно, уменьшается, чтобы иметь возможность получать + сигнализирует + обрабатывает и контролирует множество (в настоящее время) CONCURRENT
-читанных запросов с внутренним по-прежнему чистым заголовком очереди SERIAL для безопасного процесса и доставки очереди и с задержкой (задержка, очевидно, увеличивается, часто во много раз на несколько порядков выше, по сравнению с оригинальной, несложной, чистой - SERIAL
, исключительной механикой доступа к сообщениям монополиста с малой задержкой).
С учетом вышесказанного, истинное PARALLEL
планирование процессов («все читают все сообщения в один и тот же момент» (все обслуживается за один раз, синхронно, можно добавить)) просто никогда не происходит один экземпляр экземпляра очереди, никогда.
Решение:
Для более быстрой пропускной способности лучший способ состоит в том, чтобы поддерживать чистую SERIAL, низко-латентную обработку головного узла в экземпляре Queue и отправлять каждое сообщение в какой-либо другой рабочий поток для обработки выгруженного сообщения, независимо от других сообщений, все еще ожидающих внутри (да, все еще и всегда будет) pure- SERIAL
очереди сообщений.
Существует много способов отправки уже выгруженного содержимого сообщения для дальнейшей обработки, и такой выбор зависит от вашей архитектуры и проектных решений. Оба inproc://
или ipc://
(без стека протоколов) средства межпотоковой или межпроцессорной связи хорошо оснащены для использования пулов из многих сотен потоков обработки или достаточно (совместно размещенные или даже широко распространяемые) процессы обработки содержимого сообщений, если вашему приложению требуется увеличение и увеличение производительности, но при этом сохраняется минимально возможная задержка.
Остерегайтесь всех дополнительных расходов
, все еще оставшихся внутри while(true){...}
-инфинитных циклов,
любые такие, тем более повторяющиеся, вырождают Закон Амдала Вычисляем PARALLEL-код Speedup :
Код «как есть» (как сообщает @ TheodorZoulias ) повторяет и повторяет все экземпляры - на самом деле нет оснований для повторного создания, выбрасывания и повторного создания всех 30-предложенных глав очереди -концы за каждый цикл цикла. Кажется, это худшая последовательность шагов и самое плохое управление ресурсами из когда-либо возможных:
while (true) //--------------------------------- INFINITE LOOP
{ //----- NEW ARRAY CREATED PER-LOOP
var blockArray = Enumerable.Range(0, 30).ToArray();
//----- NEW .ForEach(){...} CREATED PER-LOOP
// |||||||||||||||||||||||||||||||||||||||||||||||||||||||PAR||| BLOCK
Parallel.ForEach(blockArray, (i) => // ||||||||||||||||PAR||| BLOCK
{ //----- NEW QUEUE CREATED PER-LOOP x PER-BLOCK
MessageQueue queue = new MessageQueue(queueName);
try
{ var message = queue.Receive(TimeSpan.Zero);
message.Formatter = new BinaryMessageFormatter();
var labelParts = message.Label.Split('_');
var isValidMessageAddress= Validate(labelParts);
if (isValidMessageAddress)
{
//--------------------- PER MESSAGE PAYLOAD PROCESSING START
// call my sysnc method
//--------------------- PER MESSAGE PAYLOAD PROCESSING END
}
}
catch (MessageQueueException mqex)
{ if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
{ return; //-- <<<< ???
//------------ EACH IOTimeout KILLS +ONE POOL-MEMBER
//------------ 30th IOTimeout LEAVES THE POOL EMPTY
}
else throw;
//-------------- NO ERROR HANDLING
}
});
// |||||||||||||||||||||||||||||||||||||||||||||||||||||||PAR||| BLOCK
} //-------------------------------------------- INFINITE-LOOP
Может быть довольно переформулировано (плюс повторное использование / предварительное распределение переменных может помочь еще больше):
// |||||||||||||||||||||||||||||||||||||||||||||||||||||||||:PAR||| BLOCK
Parallel.ForEach(blockArray, (i) => // |||||||||||||||:PAR||| BLOCK
{ //--- A QUEUE HEAD-END CREATED ONLY ONCE! PER-BLOCK
MessageQueue queue = new MessageQueue(queueName); //:PAR||| BLOCK
while (true) //----PER-AGENT's [SERIAL]READING LOOP//:PAR||| BLOCK
{ //:PAR||| BLOCK
try //----PER-AGENT's TRY{} //:PAR||| BLOCK
{ var message = queue.Receive( TimeSpan.Zero );
message.Formatter = new BinaryMessageFormatter();
if ( Validate( message.Label.Split( '_' )))//:PAR||| BLOCK
{ //------PER-AGENT's---- PER MESSAGE //:PAR||| BLOCK
// call my sysnc method //:PAR||| BLOCK
//------PER-AGENT's---- PER MESSAGE //:PAR||| BLOCK
} //:PAR||| BLOCK
} //:PAR||| BLOCK
catch ... //:PAR||| BLOCK
//-------------PER-AGENT's PER EXCEPTION HANDLING:PAR||| BLOCK
// WITHOUT ANY POOL AGENT:PAR||| BLOCK
// CANNIBALISATION :PAR||| BLOCK
} // --------------PER-AGENT's [SERIAL]READING LOOP :PAR||| BLOCK
} //|||||||||||||||||||||||||||||||||||||||||||||||||||||:PAR||| BLOCK
// |||||||||||||||||||||||||||||||||||||||||||||||||||||||||:PAR||| BLOCK
Пусть больше потоков для перемещения большего количества сообщений - масштабирование производительности
Как только концепция дизайна кода становится ясной и обоснованной, и если проблема связана с управлением на уровне очереди, высокопроизводительные структуры обмена сообщениями позволяют увеличить количество выделенных для очереди потоков ввода-вывода. Конверты достижимой производительности намного выше нескольких k[msg/s]
до нескольких ~ 10.000 ~ 100.000 ~ 1.000.000
сообщений в секунду , поэтому наличие нескольких тысяч сообщений в очереди в секунду для снятия с очереди определенно не проблема, если правильное проектирование введено в действие (предполагая, что Очередь не перемещает некоторые действительно очень большие BLOB-ы, где дополнительно должны быть сделаны некоторые трюки Zero-Copy / перемещение указателя, чтобы поддерживать темп)