Вы, вероятно, получите гораздо больше ускорения, если вместо того, чтобы производить и потреблять побайтово, вы будете работать кусками. В этом случае «чистота блокировки» кода, вероятно, не будет иметь значения вообще - на самом деле, традиционное решение по блокировке может быть предпочтительнее. Я постараюсь продемонстрировать.
Без блокировки, один производитель, один потребитель, ограниченная очередь задается в C #. (Листинг А)
Там нет никаких эзотерических операций с блокировкой, даже нет явных барьеров памяти. Скажем так, на первый взгляд он настолько быстр и без блокировок, насколько это возможно. Не так ли?
Теперь давайте сравним его с решением по блокировке , которое дал Марк Гравелл, здесь .
Мы будем использовать двухпроцессорную машину, которая не имеет общего кэша L3 между ядрами.
Мы ожидаем максимально 2-кратное ускорение. Ускорение в 2 раза действительно означало бы, что решение без блокировки работает идеально, в теоретических пределах.
Чтобы создать идеальную среду для кода без блокировки, мы даже установим сродство ЦП производителя и потока потребителя, используя служебный класс от здесь .
Полученный код теста находится в (Листинг B).
производит ок. 10 МБайт в одном потоке, а в другом -.
Размер очереди установлен в 32KBytes. Если он заполнен, производитель ждет.
Типичный прогон теста на моей машине выглядит так:
LockFreeByteQueue: 799мс
ByteQueue: 1843ms
Очередь без блокировки быстрее. Вау, это более чем в 2 раза быстрее! Это то, чем можно похвастаться. :)
Давайте посмотрим на то, что происходит.
Блокировка очереди Марка делает именно это. Это замки. Это делает это для каждого байта.
Нужно ли нам блокировать каждый байт и передавать данные байт за байтом? Он, несомненно, поступает в сети частями (например, около 1 тыс. Пакетов). Даже если он действительно поступает побайтово из внутреннего источника, производитель может легко упаковать его в красивые куски.
Давайте просто сделаем это - вместо того, чтобы производить и потреблять побайтные байты, давайте работать в чанках и добавим два других теста к микропроцессору (в листинге C просто вставьте его в тело теста).
Теперь типичный пробег выглядит так:
LockFreePageQueue: 33 мс
PageQueue: 25мс
Теперь оба они на самом деле в 20 раз быстрее, чем исходный код без блокировки - Решение Марка с добавленным чанком теперь на быстрее , чем код без блокировки с блоками !
Вместо использования структуры без блокировки, которая привела бы к ускорению в 2 раза, мы попытались найти другое решение, которое прекрасно работает с блокировкой и привело к ускорению в 20 раз (!).
Ключ ко многим проблемам не столько в том, чтобы избегать блокировок, сколько в том, чтобы избежать разделения и минимизировать блокировку. В приведенном выше случае мы можем избежать совместного использования во время байтового копирования.
Мы можем работать с частной структурой большую часть времени, а затем ставить в очередь один указатель, сокращая тем самым общее пространство и время до одной вставки одного указателя в очередь.
Листинг A, очередь без единого производителя, очередь с одним потребителем:
public class BoundedSingleProducerSingleConsumerQueue<T>
{
T[] queue;
volatile int tail;
volatile int head;
public BoundedSingleProducerSingleConsumerQueue(int capacity)
{
queue = new T[capacity + 1];
tail = head = 0;
}
public bool TryEnqueue(T item)
{
int newtail = (tail + 1) % queue.Length;
if (newtail == head) return false;
queue[tail] = item;
tail = newtail;
return true;
}
public bool TryDequeue(out T item)
{
item = default(T);
if (head == tail) return false;
item = queue[head];
queue[head] = default(T);
head = (head + 1) % queue.Length;
return true;
}
}
Листинг B, микро-тест:
class Program
{
static void Main(string[] args)
{
for (int numtrials = 3; numtrials > 0; --numtrials)
{
using (ProcessorAffinity.BeginAffinity(0))
{
int pagesize = 1024 * 10;
int numpages = 1024;
int totalbytes = pagesize * numpages;
BoundedSingleProducerSingleConsumerQueue<byte> lockFreeByteQueue = new BoundedSingleProducerSingleConsumerQueue<byte>(1024 * 32);
Stopwatch sw = new Stopwatch();
sw.Start();
ThreadPool.QueueUserWorkItem(delegate(object state)
{
using (ProcessorAffinity.BeginAffinity(1))
{
for (int i = 0; i < totalbytes; i++)
{
while (!lockFreeByteQueue.TryEnqueue((byte)(i & 0xFF))) ;
}
}
});
for (int i = 0; i < totalbytes; i++)
{
byte tmp;
while (!lockFreeByteQueue.TryDequeue(out tmp)) ;
}
sw.Stop();
Console.WriteLine("LockFreeByteQueue: {0}ms", sw.ElapsedMilliseconds);
SizeQueue<byte> byteQueue = new SizeQueue<byte>(1024 * 32);
sw.Reset();
sw.Start();
ThreadPool.QueueUserWorkItem(delegate(object state)
{
using (ProcessorAffinity.BeginAffinity(1))
{
for (int i = 0; i < totalbytes; i++)
{
byteQueue.Enqueue((byte)(i & 0xFF));
}
}
});
for (int i = 0; i < totalbytes; i++)
{
byte tmp = byteQueue.Dequeue();
}
sw.Stop();
Console.WriteLine("ByteQueue: {0}ms", sw.ElapsedMilliseconds);
Console.ReadKey();
}
}
}
}
Листинг C, кусочные тесты:
BoundedSingleProducerSingleConsumerQueue<byte[]> lockfreePageQueue = new BoundedSingleProducerSingleConsumerQueue<byte[]>(32);
sw.Reset();
sw.Start();
ThreadPool.QueueUserWorkItem(delegate(object state)
{
using (ProcessorAffinity.BeginAffinity(1))
{
for (int i = 0; i < numpages; i++)
{
byte[] page = new byte[pagesize];
for (int j = 0; j < pagesize; j++)
{
page[j] = (byte)(i & 0xFF);
}
while (!lockfreePageQueue.TryEnqueue(page)) ;
}
}
});
for (int i = 0; i < numpages; i++)
{
byte[] page;
while (!lockfreePageQueue.TryDequeue(out page)) ;
for (int j = 0; j < pagesize; j++)
{
byte tmp = page[j];
}
}
sw.Stop();
Console.WriteLine("LockFreePageQueue: {0}ms", sw.ElapsedMilliseconds);
SizeQueue<byte[]> pageQueue = new SizeQueue<byte[]>(32);
ThreadPool.QueueUserWorkItem(delegate(object state)
{
using (ProcessorAffinity.BeginAffinity(1))
{
for (int i = 0; i < numpages; i++)
{
byte[] page = new byte[pagesize];
for (int j = 0; j < pagesize; j++)
{
page[j] = (byte)(i & 0xFF);
}
pageQueue.Enqueue(page);
}
}
});
sw.Reset();
sw.Start();
for (int i = 0; i < numpages; i++)
{
byte[] page = pageQueue.Dequeue();
for (int j = 0; j < pagesize; j++)
{
byte tmp = page[j];
}
}
sw.Stop();
Console.WriteLine("PageQueue: {0}ms", sw.ElapsedMilliseconds);