Является ли использование статической очереди потокобезопасным? - PullRequest
9 голосов
/ 06 октября 2010

В документации msdn говорится, что статическая универсальная очередь является поточно-ориентированной. Означает ли это, что следующий код является поточно-ориентированным? Другими словами, есть ли проблема, когда поток ставит в очередь целое число, а другой поток одновременно удаляет целое число? Нужно ли блокировать операции Enqueue и Dequeue для обеспечения безопасности потоков?

class Test {
    public static Queue<int> queue = new Queue<int>(10000);

    Thread putIntThread;
    Thread takeIntThread;

    public Test() {
        for(int i = 0; i < 5000; ++i) {
            queue.Enqueue(0);
        }
        putIntThread = new Thread(this.PutInt);
        takeIntThread = new Thread(this.TakeInt);
        putIntThread.Start();
        takeIntThread.Start();
    }

    void PutInt() {
        while(true)
        {
            if(queue.Count < 10000) {//no need to lock here as only itself can change this condition
                queue.Enqueue(0);
            }
        }
    }

    void TakeInt() {
        while(true) {
            if(queue.Count > 0) {//no need to lock here as only itself can change this condition
                queue.Dequeue();
            }
        }
    }

}

Редактировать: я должен использовать .NET 3.5

Ответы [ 4 ]

23 голосов
/ 06 октября 2010

Это абсолютно не поточно-ориентированный.Из документов Queue<T>.

Открытые статические (Shared в Visual Basic) члены этого типа являются поточно-ориентированными.Ни один из членов экземпляра не гарантированно является потокобезопасным.

A Queue<T> может поддерживать несколько считывателей одновременно, если коллекция не изменена.Тем не менее, перечисление в коллекции по сути не является потокобезопасной процедурой.Чтобы гарантировать безопасность потоков во время перечисления, вы можете заблокировать коллекцию во время всего перечисления.Чтобы разрешить доступ к коллекции из нескольких потоков для чтения и записи, необходимо реализовать собственную синхронизацию.

Перечитывая свой вопрос, вы, похоже, не понимаете фразу «статические члены этого типа»- речь не идет о «статической очереди», поскольку такой вещи не существует.Объект не статичен или нет - член есть.Когда речь идет о статических членах, речь идет о таких вещах, как Encoding.GetEncoding (Queue<T> на самом деле не имеет статических членов).Члены экземпляра - это такие вещи, как Enqueue и Dequeue - члены, которые относятся к экземпляру типа, а не к самому типу.

Так что либо вам нужно использовать блокировку для каждого действия, либо если вы 'повторно используя .NET 4, используйте ConcurrentQueue<T>.

4 голосов
/ 06 октября 2010

Да, как уже говорилось, элемент экземпляра статического экземпляра уже не то же самое, что статический элемент, и только для последнего гарантируется безопасность потоков, поэтому вам нужно заблокировать операции enqueue и dequeue.

Если блокировка оказалась узким местом, очереди являются одной из более простых коллекций для записи без блокировок, если не требуется полная *Реализация 1005 * обеспечивается Queue<T>:

internal sealed class LockFreeQueue<T>
{
  private sealed class Node
  {
    public readonly T Item;
    public Node Next;
    public Node(T item)
    {
      Item = item;
    }
  }
  private volatile Node _head;
  private volatile Node _tail;
  public LockFreeQueue()
  {
    _head = _tail = new Node(default(T));
  }
#pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked
  public void Enqueue(T item)
  {
    Node newNode = new Node(item);
    for(;;)
    {
      Node curTail = _tail;
      if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)   //append to the tail if it is indeed the tail.
      {
        Interlocked.CompareExchange(ref _tail, newNode, curTail);   //CAS in case we were assisted by an obstructed thread.
        return;
      }
      else
      {
        Interlocked.CompareExchange(ref _tail, curTail.Next, curTail);  //assist obstructing thread.
      }
    }
  }    
  public bool TryDequeue(out T item)
  {
    for(;;)
    {
      Node curHead = _head;
      Node curTail = _tail;
      Node curHeadNext = curHead.Next;
      if (curHead == curTail)
      {
        if (curHeadNext == null)
        {
          item = default(T);
          return false;
        }
        else
          Interlocked.CompareExchange(ref _tail, curHeadNext, curTail);   // assist obstructing thread
      }
      else
      {
        item = curHeadNext.Item;
        if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
        {
          return true;
        }
      }
    }
  }
#pragma warning restore 420
}

Эта очередь имеет только методы Enqueue и TryDequeue (возвращает false, если очередь пуста).Добавление свойства Count с использованием взаимосвязанных приращений и уменьшений тривиально (убедитесь, что поле count читается в фактическом свойстве энергозависимо), но помимо этого становится довольно сложно добавить все, что не может быть записано как делегирование one членов уже определены или происходят во время построения (в этом случае у вас будет только один поток, использующий его в этот момент, если вы не сделаете что-то действительно странное).

Реализация также ожидает-free, как если бы действия одного потока не помешали другому прогрессировать (если поток находится на полпути в процедуре постановки в очередь, когда второй поток пытается это сделать, второй поток завершит работу первого потока).

Тем не менее, я бы подождал, пока блокировка на самом деле не оказалась узким местом (если только вы не экспериментируете; играйте с экзотикой, работайте со знакомыми).Действительно, во многих ситуациях это окажется более дорогостоящим, чем блокировка на Queue<T>, особенно потому, что не очень хорошо хранить элементы рядом друг с другом в памяти, так что вы можете обнаружить, что множество операций в последовательной последовательности были менее производительными для этого.причина.Блокировка, как правило, довольно дешевая, если не происходит частых конфликтов блокировок.

Редактировать:

Теперь у меня есть время добавить примечания о том, как работает вышеперечисленное.Я написал это, прочитав чужую версию той же идеи, написав ее для себя, чтобы скопировать идею, а затем сравнив с версией, которую я прочитал впоследствии, и нашел это очень информативным упражнением для этого.

Давайте начнем с реализации без блокировки.Это односвязный список.

internal sealed class NotLockFreeYetQueue<T>
{
  private sealed class Node
  {
    public readonly T Item;
    public Node Next{get;set;}
    public Node(T item)
    {
      Item = item;
    }
  }
  private Node _head;
  private Node _tail;
  public NotLockFreeYetQueue()
  {
    _head = _tail = new Node(default(T));
  }
  public void Enqueue(T item)
  {
    Node newNode = new Node(item);
    _tail.Next = newNode;
    _tail = newNode;
  }
  public bool TryDequeue(out T item)
  {
      if (_head == _tail)
      {
          item = default(T);
          return false;
      }
      else
      {
        item = _head.Next.Item;
        _head = _head.Next;
        return true;
      }
  }
}

Несколько замечаний по реализации.

Item и Next могут быть либо полями, либо свойствами.Так как это простой внутренний класс, и один из них должен быть readonly, а другой - «тупым» чтением-записью (без логики в геттере или сеттере), здесь действительно нечего выбирать.Я сделал Next свойством здесь исключительно потому, что это не сработает позже, и я хочу поговорить об этом, когда мы туда доберемся.

Наличие _head и _tail начинаются какуказание на страж, а не на null упрощает задачу, так как не требуется особый случай для пустой очереди.

Таким образом, постановка в очередь создаст новый узел и установит его как _tail 'Nextсобственность, прежде чем стать новым хвостом.Снятие очереди проверит пустоту и, если она не пуста, получит значение из головного узла и установит головку равной узлу, который был свойством Next старого заголовка.

Еще одна вещь, на которую следует обратить внимание в этой точке, так как новые узлы создаются по мере необходимости, а не в предварительно выделенном массиве, при нормальном использовании он будет иметь менее хорошую производительность, чем Queue<T>.Это не станет лучше, и все, что мы собираемся сделать сейчас, ухудшит производительность одного потока.Опять же, только в серьезном утверждении, что это побьет заблокированный Queue<T>.

Давайте сделаем очередь без блокировки.Мы будем использовать Interlocked.CompareExchange().Это сравнивает первый параметр с третьим параметром и устанавливает первый параметр, чтобы быть вторым параметром, если они равны.В любом случае он возвращает старое значение (было ли оно перезаписано или нет).Сравнение и обмен выполняются как элементарная операция, так что она сама по себе безопасна для потоков, но нам нужно немного больше работы, чтобы сделать комбинации таких операций также потокобезопасными.

CompareExchange и эквиваленты в других языках иногда сокращаются до CAS (для Compare-And-Swap).

Распространенный способ их использования - циклы, где мы сначала получаем значение, которое мы перезаписываем при обычном чтении (помните, что чтение .NET 32-битных значений, меньших значений и ссылочных типов всегда атомарное) и пробуем перезаписать его, если он не изменился, зацикливаясь, пока мы не добьемся успеха:

private sealed class Node
{
  public readonly T Item;
  public Node Next;
  public Node(T item)
  {
    Item = item;
  }
}
/* ... */
private volatile Node _tail;
/* ... */
public void Enqueue(T item)
{
  Node newNode = new Node(item);
  for(;;)
  {
    Node curTail = _tail;
    if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)
    {
      _tail = newNode;
      return;
    }
  }
}

Мы хотим добавить к хвосту Next, только если это null - если не другой поток, как написано в нем. Итак, мы делаем CAS, который будет успешным, только если это так. Если это так, мы устанавливаем _tail в качестве этого нового узла, в противном случае мы попробуем снова.

Далее нужно было изменить поле, чтобы это работало, мы не можем сделать это со свойствами. Мы также делаем _tail volatile, чтобы _tail было свежим во всех кэшах ЦП (CompareExchange имеет изменчивую семантику, поэтому она не будет нарушена из-за отсутствия волатильности, но может вращаться чаще, чем необходимо, и мы тоже будем делать больше с _tail).

Это без блокировки, но не без ожидания. Если поток достигал CAS, но еще не записал в _tail, а затем некоторое время не имел времени ЦП, все другие потоки, пытающиеся поставить в очередь, продолжали бы зацикливаться до тех пор, пока не были запланированы и не смогли это сделать. Если поток был прерван или приостановлен на долгое время, это вызвало бы постоянную живую блокировку.

Так что, если мы находимся в состоянии, когда CAS вышел из строя, мы находимся в такой ситуации. Мы можем исправить это, выполнив для этого работу другого потока:

  for(;;)
  {
    Node curTail = _tail;
    if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)
    {
      Interlocked.CompareExchange(ref _tail, newNode, curTail);   //CAS in case we were assisted by an obstructed thread.

      return;
    }
    else
    {
      Interlocked.CompareExchange(ref _tail, curTail.Next, curTail);  //assist obstructing thread.
    }
  }

Теперь в большинстве случаев поток, записавший в curTail.Next, назначит новый узел _tail - но через CAS на случай, если это уже сделано. Однако другой поток не может записать в curtail.Next, он может попытаться назначить curTail.Next для _tail, чтобы выполнить работу первого потока и перейти к своей собственной.

Итак, очередь без блокировки и без ожидания. Время работать над выгрузкой. Сначала давайте рассмотрим случай, когда мы не подозреваем, что очередь пуста. Как и при постановке в очередь, мы сначала получим локальные копии интересующих нас узлов; _head, _tail и _head.Next (опять же, использование пустой головы или хвоста для пустых очередей облегчает жизнь; это означает, что в любом состоянии безопасно читать _head.Next). Также как и с постановкой в ​​очередь, мы будем зависеть от волатильности, на этот раз не только от _tail, но _head, поэтому мы изменим его на:

private volatile Node _head;

И мы изменяем TryDequeue на:

  public bool TryDequeue(out T item)
  {
      Node curHead = _head;
      Node curTail = _tail;
      Node curHeadNext = curHead.Next;
      if (_head == _tail)
      {
          item = default(T);
          return false;
      }
      else
      {
        item = curHeadNext.Item;
        if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
          return true;
      }
  }

Случай с пустой очередью теперь неверен, но мы вернемся к этому. Безопасно установить элемент в curHeadNext.Item, так как если мы не завершим операцию, мы перезапишем ее снова, но мы должны сделать операцию записи в _head атомарной и гарантированно произойдет, только если _head не изменилась. Если этого не произошло, то _head был обновлен другим потоком, и мы можем выполнить цикл снова (нет необходимости работать с этим потоком, он уже сделал все, что на нас повлияет).

Теперь рассмотрим, что произойдет, если _head == _tail. Возможно, он пуст, но, возможно, _tail.Next (который будет таким же, как curHeadNext) был записан в очередь. В таком случае то, что мы, скорее всего, хотим, это не результат пустого запроса, а результат того, что мы удалили этот частично поставленный в очередь элемент. Итак, мы помогаем этому потоку и продолжаем цикл снова:

if (curHead == curTail)
{
    if (curHeadNext == null)
    {
        item = default(T);
        return false;
    }
    else
        Interlocked.CompareExchange(ref _tail, curHeadNext, curTail);
}

Наконец, единственная оставшаяся проблема заключается в том, что мы продолжаем получать 420 предупреждений, потому что мы передаем переменные поля методам byref. Это часто останавливает изменчивую семантику (отсюда и предупреждение), но не с CompareExchange (отсюда и наше). Мы можем отключить предупреждение, включая комментарий, чтобы объяснить, почему мы это сделали (я стараюсь никогда не отключать предупреждение без оправдательного комментария), и у нас есть код, который я дал ранее.

Обратите внимание, что для этого важно, чтобы мы делали это в рамках поддержки GC. Если бы нам пришлось обрабатывать освобождение, это было бы намного сложнее.

2 голосов
/ 06 октября 2010

Что указывает MSDN, так это то, что статические методы очереди являются поточно-ориентированными, а методы статического экземпляра - поточно-безопасными.

1 голос
/ 06 октября 2010

Да, вы должны заблокировать так же, как MSDN говорит

Чтобы разрешить доступ к коллекции из нескольких потоков для чтения и записи, необходимо реализовать собственную синхронизацию.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...