Потокобезопасная блокировка взаимной очереди ByteArray - PullRequest
2 голосов
/ 10 апреля 2010

Поток байтов должен быть передан, и есть один поток производителя и потребительский. Скорость производителя в большинстве случаев выше, чем у потребителя, и мне нужно достаточно буферизованных данных для QoS моего приложения. Я прочитал о своей проблеме, и есть решения, такие как общий буфер, класс PipeStream .NET ... Этот класс будет многократно создаваться на сервере, поэтому мне нужно и оптимизированное решение. Это хорошая идея использовать очередь ByteArray?

Если да, я буду использовать алгоритм оптимизации, чтобы угадать размер очереди и каждую емкость ByteArray, и теоретически она соответствует моему случаю.

Если нет, то какой подход лучше?

Пожалуйста, дайте мне знать, если есть хорошая поточная безопасная реализация ByteArray Queue в C # или VB.

Заранее спасибо

Ответы [ 6 ]

3 голосов
/ 12 апреля 2010

Вы, вероятно, получите гораздо больше ускорения, если вместо того, чтобы производить и потреблять побайтово, вы будете работать кусками. В этом случае «чистота блокировки» кода, вероятно, не будет иметь значения вообще - на самом деле, традиционное решение по блокировке может быть предпочтительнее. Я постараюсь продемонстрировать.

Без блокировки, один производитель, один потребитель, ограниченная очередь задается в C #. (Листинг А)
Там нет никаких эзотерических операций с блокировкой, даже нет явных барьеров памяти. Скажем так, на первый взгляд он настолько быстр и без блокировок, насколько это возможно. Не так ли?
Теперь давайте сравним его с решением по блокировке , которое дал Марк Гравелл, здесь .

Мы будем использовать двухпроцессорную машину, которая не имеет общего кэша L3 между ядрами. Мы ожидаем максимально 2-кратное ускорение. Ускорение в 2 раза действительно означало бы, что решение без блокировки работает идеально, в теоретических пределах.
Чтобы создать идеальную среду для кода без блокировки, мы даже установим сродство ЦП производителя и потока потребителя, используя служебный класс от здесь .
Полученный код теста находится в (Листинг B).

производит ок. 10 МБайт в одном потоке, а в другом -.
Размер очереди установлен в 32KBytes. Если он заполнен, производитель ждет.
Типичный прогон теста на моей машине выглядит так:

LockFreeByteQueue: 799мс
ByteQueue: 1843ms

Очередь без блокировки быстрее. Вау, это более чем в 2 раза быстрее! Это то, чем можно похвастаться. :)
Давайте посмотрим на то, что происходит. Блокировка очереди Марка делает именно это. Это замки. Это делает это для каждого байта.

Нужно ли нам блокировать каждый байт и передавать данные байт за байтом? Он, несомненно, поступает в сети частями (например, около 1 тыс. Пакетов). Даже если он действительно поступает побайтово из внутреннего источника, производитель может легко упаковать его в красивые куски.
Давайте просто сделаем это - вместо того, чтобы производить и потреблять побайтные байты, давайте работать в чанках и добавим два других теста к микропроцессору (в листинге C просто вставьте его в тело теста).
Теперь типичный пробег выглядит так:

LockFreePageQueue: 33 мс
PageQueue: 25мс

Теперь оба они на самом деле в 20 раз быстрее, чем исходный код без блокировки - Решение Марка с добавленным чанком теперь на быстрее , чем код без блокировки с блоками !
Вместо использования структуры без блокировки, которая привела бы к ускорению в 2 раза, мы попытались найти другое решение, которое прекрасно работает с блокировкой и привело к ускорению в 20 раз (!).
Ключ ко многим проблемам не столько в том, чтобы избегать блокировок, сколько в том, чтобы избежать разделения и минимизировать блокировку. В приведенном выше случае мы можем избежать совместного использования во время байтового копирования.
Мы можем работать с частной структурой большую часть времени, а затем ставить в очередь один указатель, сокращая тем самым общее пространство и время до одной вставки одного указателя в очередь.

Листинг A, очередь без единого производителя, очередь с одним потребителем:

public class BoundedSingleProducerSingleConsumerQueue<T>
{
    T[] queue;
    volatile int tail;
    volatile int head;

    public BoundedSingleProducerSingleConsumerQueue(int capacity)
    {
        queue = new T[capacity + 1];
        tail = head = 0;
    }

    public bool TryEnqueue(T item)
    {
        int newtail = (tail + 1) % queue.Length;
        if (newtail == head) return false;
        queue[tail] = item;
        tail = newtail;
        return true;
    }

    public bool TryDequeue(out T item)
    {
        item = default(T);
        if (head == tail) return false;
        item = queue[head];
        queue[head] = default(T);
        head = (head + 1) % queue.Length;
        return true;
    }
}

Листинг B, микро-тест:

class Program
{
    static void Main(string[] args)
    {
        for (int numtrials = 3; numtrials > 0; --numtrials)
        {
            using (ProcessorAffinity.BeginAffinity(0))
            {
                int pagesize = 1024 * 10;
                int numpages = 1024;
                int totalbytes = pagesize * numpages;

                BoundedSingleProducerSingleConsumerQueue<byte> lockFreeByteQueue = new BoundedSingleProducerSingleConsumerQueue<byte>(1024 * 32);
                Stopwatch sw = new Stopwatch();
                sw.Start();
                ThreadPool.QueueUserWorkItem(delegate(object state)
                {
                    using (ProcessorAffinity.BeginAffinity(1))
                    {
                        for (int i = 0; i < totalbytes; i++)
                        {
                            while (!lockFreeByteQueue.TryEnqueue((byte)(i & 0xFF))) ;
                        }
                    }
                });
                for (int i = 0; i < totalbytes; i++)
                {
                    byte tmp;
                    while (!lockFreeByteQueue.TryDequeue(out tmp)) ;
                }
                sw.Stop();
                Console.WriteLine("LockFreeByteQueue: {0}ms", sw.ElapsedMilliseconds);


                SizeQueue<byte> byteQueue = new SizeQueue<byte>(1024 * 32);
                sw.Reset();
                sw.Start();
                ThreadPool.QueueUserWorkItem(delegate(object state)
                {
                    using (ProcessorAffinity.BeginAffinity(1))
                    {
                        for (int i = 0; i < totalbytes; i++)
                        {
                            byteQueue.Enqueue((byte)(i & 0xFF));
                        }
                    }
                });

                for (int i = 0; i < totalbytes; i++)
                {
                    byte tmp = byteQueue.Dequeue();
                }
                sw.Stop();
                Console.WriteLine("ByteQueue: {0}ms", sw.ElapsedMilliseconds);

                Console.ReadKey();
            }
        }
    }
}

Листинг C, кусочные тесты:

BoundedSingleProducerSingleConsumerQueue<byte[]> lockfreePageQueue = new BoundedSingleProducerSingleConsumerQueue<byte[]>(32);
sw.Reset();
sw.Start();
ThreadPool.QueueUserWorkItem(delegate(object state)
{
    using (ProcessorAffinity.BeginAffinity(1))
    {
        for (int i = 0; i < numpages; i++)
        {
            byte[] page = new byte[pagesize];
            for (int j = 0; j < pagesize; j++)
            {
                page[j] = (byte)(i & 0xFF);
            }
            while (!lockfreePageQueue.TryEnqueue(page)) ;
        }
    }
});
for (int i = 0; i < numpages; i++)
{
    byte[] page;
    while (!lockfreePageQueue.TryDequeue(out page)) ;
    for (int j = 0; j < pagesize; j++)
    {
        byte tmp = page[j];
    }
}
sw.Stop();
Console.WriteLine("LockFreePageQueue: {0}ms", sw.ElapsedMilliseconds);

SizeQueue<byte[]> pageQueue = new SizeQueue<byte[]>(32);

ThreadPool.QueueUserWorkItem(delegate(object state)
{
    using (ProcessorAffinity.BeginAffinity(1))
    {
        for (int i = 0; i < numpages; i++)
        {
            byte[] page = new byte[pagesize];
            for (int j = 0; j < pagesize; j++)
            {
                page[j] = (byte)(i & 0xFF);
            }
            pageQueue.Enqueue(page);
        }
    }
});
sw.Reset();
sw.Start();
for (int i = 0; i < numpages; i++)
{
    byte[] page = pageQueue.Dequeue();
    for (int j = 0; j < pagesize; j++)
    {
        byte tmp = page[j];
    }
}
sw.Stop();
Console.WriteLine("PageQueue: {0}ms", sw.ElapsedMilliseconds);
2 голосов
/ 10 апреля 2010

Dr.Доббс реализовал очередь без блокировки в C ++ , которую вы могли бы относительно легко перенести в C #.Он работает, когда существует ровно один производитель (может быть любое количество потребителей).

Основная идея заключается в использовании двусвязного списка в качестве базовой структуры наряду с подвижной ссылкой на голову и хвост.Когда элемент создается, он добавляется в конец, и все между началом списка и текущей «головой» удаляется.Чтобы потреблять, попытайтесь поднять голову;если он попадает в хвост, терпит неудачу, если это не так, успешно и возвращает новый элемент.Особый порядок операций делает его по своей сути поточно-ориентированным.

Однако при использовании такой конструкции без блокировки существует две основные проблемы:

  1. Тамневозможно установить верхнюю границу размера очереди, что может быть серьезной проблемой, если ваш производитель работает быстрее вашего потребителя;

  2. По своей сути метод Consume должен простоне удается получить элемент, если ничего не было создано.Это означает, что вам нужно реализовать собственную собственную блокировку для потребителя, и такая блокировка неизменно либо ожидает занятости (что на намного хуже , чем блокировка в спектре производительности), либо синхронизирует ожидания (что еще больше замедляет работу вашего потребителя).

По этим причинам я бы порекомендовал вам серьезно подумать, действительно ли вам нужна структура без блокировки.Многие люди приходят на этот сайт, думая, что он будет «быстрее», чем эквивалентная структура, использующая блокировку, но практическая разница для большинства приложений настолько незначительна, что обычно не стоит дополнительной сложности, а в некоторых случаях это может на самом делевыполнение хуже , потому что состояния ожидания (или ожидаемые оповещения) намного дешевле, чем ожидание занятости.

Многоядерные машины и необходимость в барьерах памяти делают эффективную многопоточность без блокировки еще более сложной;при нормальной работе вы все равно можете выйти из строя, а в .NET дрожание может дополнительно решить переупорядочить инструкции, так что вам, вероятно, придется добавить в код переменные volatile и вызовы Thread.MemoryBarrier, что сноваможет способствовать повышению стоимости безблокировочной версии по сравнению с базовой синхронизированной версией.

Как насчет использования сначала простой старой синхронизированной очереди производителя и потребителя и профилирования вашего приложения, чтобы определить, стоит лиэто может удовлетворить ваши требования к производительности?На сайте Джозефа Албахари есть отличная и эффективная реализация очереди для ПК.Или, как упоминает Ричард, если вы используете платформу .NET 4.0, вы можете просто использовать ConcurrentQueue или, что более вероятно, BlockingCollection .

Test first - загрузить тестированиеСинхронизированная очередь, которую легко реализовать - и наблюдайте, сколько времени фактически потрачено на блокировку.Не ожидание , которое вам придется делать в любом случае, но на самом деле получение и освобождение блокировок после того, как они станут сигнальными.Если бы это было более 1% времени выполнения вашей программы, я был бы очень удивлен;но если это так, , тогда начните смотреть на реализации без блокировок - и убедитесь, что вы тоже их профилируете, чтобы убедиться, что они действительно работают лучше.

2 голосов
/ 10 апреля 2010

В .NET 4 есть System.Collections.Concurrent.Queue<T>, который настолько свободен от блокировок, насколько это возможно (хотя он и остается общим).

1 голос
/ 10 апреля 2010

Здесь важно регулирование, по звучанию этого, класс BoundedBuffer в этой журнальной статье отвечает всем требованиям. Аналогичный класс будет доступен в .NET 4.0 как класс BlockingCollection . Настройка размера буфера остается за вами.

0 голосов
/ 05 июня 2010

Самая важная часть - это дизайн общего объекта. В моем сценарии программа чтения и записи может использовать отдельные буферы (блоки больших данных) независимо, а затем синхронизировать должен только доступ к общему объекту FIFO, такому как очередь. Таким образом, время блокировки минимизируется, и потоки могут выполнять работу параллельно. А с .NET framewok 4.0 реализация этой концепции стала проще:

В System.Collections.Concurrent есть пространство класса ConcurrentQueue (Of T), и arrayByte - хороший тип для использования в качестве типа очереди для моего сценария. В пространстве имен есть другие поточно-ориентированные коллекции.

http://msdn.microsoft.com/en-us/library/system.collections.concurrent.aspx

0 голосов
/ 11 апреля 2010

Джулиану М. Бакноллу написано на C # .

...