Создание очереди блокировки <T>в .NET? - PullRequest
157 голосов
/ 10 февраля 2009

У меня есть сценарий, в котором несколько потоков добавляются в очередь, а несколько потоков читают из одной очереди. Если очередь достигает определенного размера , все потоки , заполняющие очередь, будут заблокированы при добавлении до тех пор, пока элемент не будет удален из очереди.

Ниже приведено решение, которое я использую сейчас, и мой вопрос: как это можно улучшить? Есть ли объект, который уже разрешает такое поведение в BCL, который я должен использовать?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}

Ответы [ 9 ]

195 голосов
/ 10 февраля 2009

Это выглядит очень небезопасно (очень мало синхронизации); как насчет чего-то вроде:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(редактировать)

На самом деле вам нужен способ закрыть очередь, чтобы читатели начинали выходить чисто - возможно, что-то вроде флага bool - если установлен, пустая очередь просто возвращается (а не блокируется):

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}
53 голосов
/ 09 декабря 2011

Используйте .net 4 BlockingCollection, для постановки в очередь используйте Add (), для снятия в очередь используйте Take (). Он внутренне использует неблокирующую ConcurrentQueue. Более подробная информация здесь Быстрая и лучшая техника очереди производителей / потребителей BlockingCollection vs одновременная очередь

14 голосов
/ 10 февраля 2009

«Как это можно улучшить?»

Что ж, вам нужно посмотреть на каждый метод в вашем классе и подумать, что произойдет, если другой поток одновременно вызовет этот метод или любой другой метод. Например, вы устанавливаете блокировку в методе Remove, но не в методе Add. Что происходит, если один поток добавляет одновременно с удалением другого потока? Плохие вещи.

Также учтите, что метод может возвращать второй объект, который обеспечивает доступ к внутренним данным первого объекта, например, GetEnumerator. Представьте, что один поток проходит через этот перечислитель, другой поток одновременно изменяет список. Не хорошо.

Хорошее эмпирическое правило состоит в том, чтобы упростить эту задачу, чтобы сократить число методов в классе до абсолютного минимума.

В частности, не наследуйте другой контейнерный класс, потому что вы будете выставлять все методы этого класса, предоставляя вызывающей стороне способ повредить внутренние данные или увидеть частично завершенные изменения данных (так же плохо, потому что данные кажутся поврежденными в этот момент). Скройте все детали и будьте совершенно безжалостны о том, как вы разрешаете доступ к ним.

Я бы настоятельно рекомендовал вам использовать готовые решения - получить книгу о многопоточности или использовать стороннюю библиотеку. В противном случае, учитывая то, что вы пытаетесь, вы будете долго отлаживать свой код.

Кроме того, не имеет ли смысла для Remove возвращать элемент (скажем, тот, который был добавлен первым, поскольку он является очередью), а не вызывающий объект, выбирающий конкретный элемент? И когда очередь пуста, возможно, удаление также должно блокировать.

Обновление: ответ Марка фактически реализует все эти предложения! :) Но я оставлю это здесь, так как может быть полезно понять, почему его версия является таким улучшением.

8 голосов
/ 06 февраля 2013

Вы можете использовать BlockingCollection и ConcurrentQueue в пространстве имен System.Collections.Concurrent

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
    /// <summary>
    /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
    /// </summary>
    public ProducerConsumerQueue()  
        : base(new ConcurrentQueue<T>())
    {
    }

  /// <summary>
  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
  /// </summary>
  /// <param name="maxSize"></param>
    public ProducerConsumerQueue(int maxSize)
        : base(new ConcurrentQueue<T>(), maxSize)
    {
    }



}
6 голосов
/ 07 мая 2010

Я только что выбрал это с помощью Reactive Extensions и вспомнил этот вопрос:

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}

Не обязательно полностью безопасно, но очень просто.

5 голосов
/ 28 мая 2009

Это то, что я пришел, чтобы создать потокобезопасную ограниченную очередь блокировки.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}
2 голосов
/ 10 февраля 2009

Я не полностью изучил TPL , но у них может быть что-то, что соответствует вашим потребностям, или, по крайней мере, какой-нибудь корм для рефлекторов, чтобы черпать вдохновение.

Надеюсь, это поможет.

0 голосов
/ 10 февраля 2009

Ну, вы можете посмотреть на System.Threading.Semaphore класс. Кроме этого - нет, вы должны сделать это самостоятельно. AFAIK нет такой встроенной коллекции.

0 голосов
/ 10 февраля 2009

Если вам нужна максимальная пропускная способность, позволяющая читать нескольким читателям и писать только одному писателю, BCL имеет нечто, называемое ReaderWriterLockSlim, которое должно помочь уменьшить ваш код ...

...