Самый эффективный способ обработки очереди с потоками - PullRequest
15 голосов
/ 01 июня 2011

У меня есть очередь, в которую помещаются незавершенные запросы на преобразование Фурье (сравнительно трудоемкие операции) - в некоторых случаях мы можем получать тысячи запросов на преобразование в секунду, поэтому он должен быть быстрым.

Я обновляю старый код для использования .net 4, а также портирую на TPL. Мне интересно, как выглядит наиболее эффективный (самый быстрый пропуск) способ обработки этой очереди. Я хотел бы использовать все доступные ядра.

В настоящее время я экспериментирую с коллекцией BlockingCollection. Я создаю класс обработчика очереди, который порождает 4 задачи, которые блокируют коллекцию BlockingCollection и ожидают входящую работу. Затем они обрабатывают это ожидающее преобразование. Код:

public class IncomingPacketQueue : IDisposable
    {
        BlockingCollection<IncomingPacket> _packetQ = new BlockingCollection<IncomingPacket>();

        public IncomingPacketQueue(int workerCount)
        {
            for (int i = 0; i < workerCount; i++)
            {
                Task.Factory.StartNew(Consume);
            }
        }

        public void EnqueueSweep(IncomingPacket incoming)
        {
            _packetQ.Add(incoming);
        }

        private void Consume()
        {
            foreach (var sweep in _packetQ.GetConsumingEnumerable())
            {
                //do stuff
                var worker = new IfftWorker();
                Trace.WriteLine("  Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId));
                worker.DoIfft(sweep);                

            }
        }

        public int QueueCount
        {
            get
            {
                return _packetQ.Count;
            }
        }

    #region IDisposable Members

    public void Dispose()
    {
        _packetQ.CompleteAdding();
    }

    #endregion
    }

Это похоже на хорошее решение? Похоже, что все ядра максимально загружены - хотя в настоящее время я не уверен, сколько рабочих я должен создать в конструкторе.

Ответы [ 5 ]

7 голосов
/ 01 июня 2011

Это выглядит разумно.Я обнаружил, что BlockingCollection довольно быстро.Я использую его для обработки десятков тысяч запросов в секунду.

Если ваше приложение связано с процессором, то вы, вероятно, не хотите создавать больше рабочих, чем у вас есть ядра.Конечно, вы не хотите создавать гораздо больше рабочих, чем ядра.На четырехъядерном компьютере, если вы ожидаете, что большую часть времени будет потрачено на выполнение БПФ, тогда четыре сотрудника съедят весь процессор.Больше работников просто означает больше, что у вас есть переключатели контекста потока, чтобы иметь дело с.TPL обычно компенсирует это для вас, но нет причин создавать, скажем, 100 рабочих, если вы не можете справиться с более чем несколькими.

Я бы посоветовал вам запустить тесты с 3, 4, 56, 7 и 8 рабочих.Посмотрите, какой из них дает вам лучшую пропускную способность.

2 голосов
/ 01 июня 2011

Почему бы не использовать Parallel.ForEach и позволить TPL обрабатывать количество созданных потоков.

        Parallel.ForEach(BlockingCollectionExtensions.GetConsumingPartitioneenter(_packetQ),
                         sweep => {
                           //do stuff
                           var worker = new IfftWorker();
                           Trace.WriteLine("  Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId));
                           worker.DoIfft(sweep);                

                         });

(GetConsumingPartitioner является частью ParallelExtensionsExtras )

2 голосов
/ 01 июня 2011

Я согласен с Джимом. Ваш подход выглядит действительно хорошо. Вам не станет намного лучше. Я не эксперт FFT, но я предполагаю, что эти операции почти на 100% связаны с процессором. Если это действительно так, то хорошим первым предположением о количестве рабочих будет прямая корреляция 1: 1 с количеством ядер в машине. Вы можете использовать Environment.ProcessorCount, чтобы получить это значение. Вы можете поэкспериментировать с множителем, скажем, 2x или 4x, но, опять же, если эти операции связаны с процессором, то любое значение, превышающее 1x, может просто вызвать дополнительные издержки. Использование Environment.ProcessorCount сделает ваш код более переносимым.

Еще одно предложение ... сообщите TPL, что это выделенные темы. Вы можете сделать это, указав параметр LongRunning.

public IncomingPacketQueue()
{
    for (int i = 0; i < Environment.ProcessorCount; i++)
    {
        Task.Factory.StartNew(Consume, TaskCreationOptions.LongRunning);
    }
}
0 голосов
/ 01 июня 2011

Вы также можете попробовать использовать PLINQ для распараллеливания обработки, чтобы увидеть, как она соотносится с подходом, который вы используете в настоящее время. У него есть некоторые хитрости в рукаве, которые могут сделать его очень эффективным при определенных обстоятельствах.

_packetQ.GetConsumingEnumerable().AsParallel().ForAll(
    sweep => new IfftWorker().DoIfft(sweep));
0 голосов
/ 01 июня 2011

Сделать количество рабочих настраиваемым.Кроме того, слишком много рабочих, и это будет работать медленнее (как указано на другом плакате), поэтому вам нужно найти наилучшее место.Настраиваемое значение позволит тестовым прогонам находить оптимальное значение или позволяет адаптировать вашу программу к различным типам оборудования.Вы, безусловно, можете поместить это значение в App.Config и прочитать его при запуске.

...