Да, как уже говорилось, элемент экземпляра статического экземпляра уже не то же самое, что статический элемент, и только для последнего гарантируется безопасность потоков, поэтому вам нужно заблокировать операции enqueue и dequeue.
Если блокировка оказалась узким местом, очереди являются одной из более простых коллекций для записи без блокировок, если не требуется полная *Реализация 1005 * обеспечивается Queue<T>
:
internal sealed class LockFreeQueue<T>
{
private sealed class Node
{
public readonly T Item;
public Node Next;
public Node(T item)
{
Item = item;
}
}
private volatile Node _head;
private volatile Node _tail;
public LockFreeQueue()
{
_head = _tail = new Node(default(T));
}
#pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked
public void Enqueue(T item)
{
Node newNode = new Node(item);
for(;;)
{
Node curTail = _tail;
if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null) //append to the tail if it is indeed the tail.
{
Interlocked.CompareExchange(ref _tail, newNode, curTail); //CAS in case we were assisted by an obstructed thread.
return;
}
else
{
Interlocked.CompareExchange(ref _tail, curTail.Next, curTail); //assist obstructing thread.
}
}
}
public bool TryDequeue(out T item)
{
for(;;)
{
Node curHead = _head;
Node curTail = _tail;
Node curHeadNext = curHead.Next;
if (curHead == curTail)
{
if (curHeadNext == null)
{
item = default(T);
return false;
}
else
Interlocked.CompareExchange(ref _tail, curHeadNext, curTail); // assist obstructing thread
}
else
{
item = curHeadNext.Item;
if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
{
return true;
}
}
}
}
#pragma warning restore 420
}
Эта очередь имеет только методы Enqueue
и TryDequeue
(возвращает false, если очередь пуста).Добавление свойства Count с использованием взаимосвязанных приращений и уменьшений тривиально (убедитесь, что поле count читается в фактическом свойстве энергозависимо), но помимо этого становится довольно сложно добавить все, что не может быть записано как делегирование one членов уже определены или происходят во время построения (в этом случае у вас будет только один поток, использующий его в этот момент, если вы не сделаете что-то действительно странное).
Реализация также ожидает-free, как если бы действия одного потока не помешали другому прогрессировать (если поток находится на полпути в процедуре постановки в очередь, когда второй поток пытается это сделать, второй поток завершит работу первого потока).
Тем не менее, я бы подождал, пока блокировка на самом деле не оказалась узким местом (если только вы не экспериментируете; играйте с экзотикой, работайте со знакомыми).Действительно, во многих ситуациях это окажется более дорогостоящим, чем блокировка на Queue<T>
, особенно потому, что не очень хорошо хранить элементы рядом друг с другом в памяти, так что вы можете обнаружить, что множество операций в последовательной последовательности были менее производительными для этого.причина.Блокировка, как правило, довольно дешевая, если не происходит частых конфликтов блокировок.
Редактировать:
Теперь у меня есть время добавить примечания о том, как работает вышеперечисленное.Я написал это, прочитав чужую версию той же идеи, написав ее для себя, чтобы скопировать идею, а затем сравнив с версией, которую я прочитал впоследствии, и нашел это очень информативным упражнением для этого.
Давайте начнем с реализации без блокировки.Это односвязный список.
internal sealed class NotLockFreeYetQueue<T>
{
private sealed class Node
{
public readonly T Item;
public Node Next{get;set;}
public Node(T item)
{
Item = item;
}
}
private Node _head;
private Node _tail;
public NotLockFreeYetQueue()
{
_head = _tail = new Node(default(T));
}
public void Enqueue(T item)
{
Node newNode = new Node(item);
_tail.Next = newNode;
_tail = newNode;
}
public bool TryDequeue(out T item)
{
if (_head == _tail)
{
item = default(T);
return false;
}
else
{
item = _head.Next.Item;
_head = _head.Next;
return true;
}
}
}
Несколько замечаний по реализации.
Item
и Next
могут быть либо полями, либо свойствами.Так как это простой внутренний класс, и один из них должен быть readonly
, а другой - «тупым» чтением-записью (без логики в геттере или сеттере), здесь действительно нечего выбирать.Я сделал Next
свойством здесь исключительно потому, что это не сработает позже, и я хочу поговорить об этом, когда мы туда доберемся.
Наличие _head
и _tail
начинаются какуказание на страж, а не на null
упрощает задачу, так как не требуется особый случай для пустой очереди.
Таким образом, постановка в очередь создаст новый узел и установит его как _tail
'Next
собственность, прежде чем стать новым хвостом.Снятие очереди проверит пустоту и, если она не пуста, получит значение из головного узла и установит головку равной узлу, который был свойством Next
старого заголовка.
Еще одна вещь, на которую следует обратить внимание в этой точке, так как новые узлы создаются по мере необходимости, а не в предварительно выделенном массиве, при нормальном использовании он будет иметь менее хорошую производительность, чем Queue<T>
.Это не станет лучше, и все, что мы собираемся сделать сейчас, ухудшит производительность одного потока.Опять же, только в серьезном утверждении, что это побьет заблокированный Queue<T>
.
Давайте сделаем очередь без блокировки.Мы будем использовать Interlocked.CompareExchange()
.Это сравнивает первый параметр с третьим параметром и устанавливает первый параметр, чтобы быть вторым параметром, если они равны.В любом случае он возвращает старое значение (было ли оно перезаписано или нет).Сравнение и обмен выполняются как элементарная операция, так что она сама по себе безопасна для потоков, но нам нужно немного больше работы, чтобы сделать комбинации таких операций также потокобезопасными.
CompareExchange и эквиваленты в других языках иногда сокращаются до CAS (для Compare-And-Swap).
Распространенный способ их использования - циклы, где мы сначала получаем значение, которое мы перезаписываем при обычном чтении (помните, что чтение .NET 32-битных значений, меньших значений и ссылочных типов всегда атомарное) и пробуем перезаписать его, если он не изменился, зацикливаясь, пока мы не добьемся успеха:
private sealed class Node
{
public readonly T Item;
public Node Next;
public Node(T item)
{
Item = item;
}
}
/* ... */
private volatile Node _tail;
/* ... */
public void Enqueue(T item)
{
Node newNode = new Node(item);
for(;;)
{
Node curTail = _tail;
if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)
{
_tail = newNode;
return;
}
}
}
Мы хотим добавить к хвосту Next
, только если это null
- если не другой поток, как написано в нем. Итак, мы делаем CAS, который будет успешным, только если это так. Если это так, мы устанавливаем _tail
в качестве этого нового узла, в противном случае мы попробуем снова.
Далее нужно было изменить поле, чтобы это работало, мы не можем сделать это со свойствами. Мы также делаем _tail
volatile
, чтобы _tail
было свежим во всех кэшах ЦП (CompareExchange
имеет изменчивую семантику, поэтому она не будет нарушена из-за отсутствия волатильности, но может вращаться чаще, чем необходимо, и мы тоже будем делать больше с _tail
).
Это без блокировки, но не без ожидания. Если поток достигал CAS, но еще не записал в _tail, а затем некоторое время не имел времени ЦП, все другие потоки, пытающиеся поставить в очередь, продолжали бы зацикливаться до тех пор, пока не были запланированы и не смогли это сделать. Если поток был прерван или приостановлен на долгое время, это вызвало бы постоянную живую блокировку.
Так что, если мы находимся в состоянии, когда CAS вышел из строя, мы находимся в такой ситуации. Мы можем исправить это, выполнив для этого работу другого потока:
for(;;)
{
Node curTail = _tail;
if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)
{
Interlocked.CompareExchange(ref _tail, newNode, curTail); //CAS in case we were assisted by an obstructed thread.
return;
}
else
{
Interlocked.CompareExchange(ref _tail, curTail.Next, curTail); //assist obstructing thread.
}
}
Теперь в большинстве случаев поток, записавший в curTail.Next
, назначит новый узел _tail
- но через CAS на случай, если это уже сделано. Однако другой поток не может записать в curtail.Next
, он может попытаться назначить curTail.Next
для _tail
, чтобы выполнить работу первого потока и перейти к своей собственной.
Итак, очередь без блокировки и без ожидания. Время работать над выгрузкой. Сначала давайте рассмотрим случай, когда мы не подозреваем, что очередь пуста. Как и при постановке в очередь, мы сначала получим локальные копии интересующих нас узлов; _head
, _tail
и _head.Next
(опять же, использование пустой головы или хвоста для пустых очередей облегчает жизнь; это означает, что в любом состоянии безопасно читать _head.Next
). Также как и с постановкой в очередь, мы будем зависеть от волатильности, на этот раз не только от _tail
, но _head
, поэтому мы изменим его на:
private volatile Node _head;
И мы изменяем TryDequeue на:
public bool TryDequeue(out T item)
{
Node curHead = _head;
Node curTail = _tail;
Node curHeadNext = curHead.Next;
if (_head == _tail)
{
item = default(T);
return false;
}
else
{
item = curHeadNext.Item;
if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
return true;
}
}
Случай с пустой очередью теперь неверен, но мы вернемся к этому. Безопасно установить элемент в curHeadNext.Item
, так как если мы не завершим операцию, мы перезапишем ее снова, но мы должны сделать операцию записи в _head
атомарной и гарантированно произойдет, только если _head
не изменилась. Если этого не произошло, то _head
был обновлен другим потоком, и мы можем выполнить цикл снова (нет необходимости работать с этим потоком, он уже сделал все, что на нас повлияет).
Теперь рассмотрим, что произойдет, если _head == _tail
. Возможно, он пуст, но, возможно, _tail.Next
(который будет таким же, как curHeadNext
) был записан в очередь. В таком случае то, что мы, скорее всего, хотим, это не результат пустого запроса, а результат того, что мы удалили этот частично поставленный в очередь элемент. Итак, мы помогаем этому потоку и продолжаем цикл снова:
if (curHead == curTail)
{
if (curHeadNext == null)
{
item = default(T);
return false;
}
else
Interlocked.CompareExchange(ref _tail, curHeadNext, curTail);
}
Наконец, единственная оставшаяся проблема заключается в том, что мы продолжаем получать 420 предупреждений, потому что мы передаем переменные поля методам byref
. Это часто останавливает изменчивую семантику (отсюда и предупреждение), но не с CompareExchange
(отсюда и наше). Мы можем отключить предупреждение, включая комментарий, чтобы объяснить, почему мы это сделали (я стараюсь никогда не отключать предупреждение без оправдательного комментария), и у нас есть код, который я дал ранее.
Обратите внимание, что для этого важно, чтобы мы делали это в рамках поддержки GC. Если бы нам пришлось обрабатывать освобождение, это было бы намного сложнее.