Запрос на пересмотр: потокобезопасная очередь (параллельное принудительное нажатие) - PullRequest
2 голосов
/ 10 ноября 2010

Вот реализация потоковой очереди.Пуш и поп не будут блокировать друг друга.Тем не менее, pop будет ждать, пока элемент не будет передан, если очередь пуста.Несколько производителей и потребителей могут использовать очередь.Пожалуйста, сообщите мне, если вы видите какие-либо проблемы.

Обновление: отредактировано согласно ответам 1) Устранена проблема с ситуацией «Очередь заполнена»2) В .NET4 есть BlockingCollection<T> и ConcurrentQueue<T>.Таким образом, нет необходимости изобретать велосипед (для .NET4)

public class CustomQueue<T> where T: class
{
    class Node
    {
        public Node()
        {
            Value = null;
            NextNode = null;
        }

        public Node(T value)
        {
            Value = value;
            NextNode = null;
        }

        public T Value;
        public Node NextNode;
    }

    object PushLocker = new object();
    object PopLocker = new object();
    Semaphore QueueSemaphore;
    volatile int PushIncrement;
    volatile int PopIncrement;
    int MaxItems;
    Node FirstNode;
    Node LastNode;

    public CustomQueue(int maxItems)
    {
        QueueSemaphore = new Semaphore(0, maxItems);
        MaxItems = maxItems;
        FirstNode = LastNode = new Node();
        PushIncrement = 0;
        PopIncrement = 0;
    }

    public int Size()
    {
        return PushIncrement - PopIncrement;
    }

    public bool Push(T value)
    {
        lock(PushLocker)
        {
            if((Size()) >= MaxItems)
            {
                lock(PopLocker)
                {
                    PushIncrement = PushIncrement - PopIncrement;
                    PopIncrement = 0;
                    return false;
                }
            }
            Node newNode = new Node(value);                
            LastNode.NextNode = newNode;
            LastNode = newNode;
            PushIncrement++;
            QueueSemaphore.Release();
            return true;
        }            
    }

    public T Pop()
    {
        QueueSemaphore.WaitOne();
        lock(PopLocker)
        {
            Node tempFirst = FirstNode;
            Node tempNext = FirstNode.NextNode;
            T value = tempNext.Value;
            tempNext.Value = null;
            FirstNode = tempNext;
            PopIncrement++;
            return value;
        }
    }
}

Ответы [ 5 ]

3 голосов
/ 10 ноября 2010

Это выглядит как хорошая реализация с первого взгляда. Использование разных блокировок всегда является для меня красным флагом, поэтому я внимательно посмотрел на некоторые крайних случаев, включающих одновременные вызовы Pop и Push, и это кажется безопасным. Я подозреваю, что вы, вероятно, узнали о реализации связного списка очереди блокировки, а? Причина, по которой это безопасно, заключается в том, что вы всегда ссылаетесь только на LastNode из Push и FirstNode из Pop, иначе весь трюк развалится.

Единственное, что бросается в глаза прямо сейчас, это то, что когда вы пытаетесь освободить счет от Semaphore, он выдаст исключение, если он уже заполнен, так что вы можете принять меры против этого. 1 В противном случае вы получите дополнительные узлы в связанном списке, и очередь будет 1) иметь больше максимального количества элементов и 2) она будет заблокирована в реальном времени.

Обновление:

Я еще об этом подумал. Этот Release вызов в методе Push будет очень проблематичным. Я вижу только одно возможное решение. Удалите параметр maxItems из конструктора и разрешите семафору считать до Int.MaxValue. В противном случае вам придется радикально изменить свой подход, в результате чего будет реализована реализация, которая не будет близка к той, которая у вас сейчас есть.

1 Я думаю, вы поймете, что это будет сложнее, чем вы могли бы сразу понять.

3 голосов
/ 10 ноября 2010

Если вы делаете это для самообразования, отлично - иначе BlockingCollection<T> или ConcurrentQueue<T> - хорошие альтернативы.

Одна проблема, которую я вижуздесь нет способа прервать Pop после его запуска - он предполагает, что объект ожидает, когда пробуждается.Как бы вы объяснили это при прекращении?TryPop, который возвращает true (с элементом) или false (если нет данных), может быть лучше, тогда вы могли бы сигнализировать о чистом завершении ожидающих потоков после опустошения очереди.

3 голосов
/ 10 ноября 2010

1. Рассмотрите возможность добавления второго конструктора Node:

        public Node(T value)
        {
            Value = value;
        }

, тогда ваш код клиента:

            Node newNode = new Node();
            newNode.Value = value;

может рассматривать значение как инвариант:

            Node newNode = new Node(value);

2. Затем сделайте ваши открытые поля:

        public T Value;
        public Node NextNode;

в свойствах авто:

        public T Value { get; private set; };
        public Node NextNode { get; set; };

Таким образом, вы можете абстрагировать использование от реализации и добавить проверку, другую обработку,и т.д. по факту с минимальным нарушением работы клиентского кода.

2 голосов
/ 10 ноября 2010
0 голосов
/ 11 ноября 2010

Поскольку я фанат неизменных объектов, вот альтернатива моему более раннему ответу, который я бы посчитал чище:

public sealed class CustomQueue<T> where T : class
{
    private readonly object pushLocker = new object();
    private readonly object popLocker = new object();
    private readonly Semaphore queueSemaphore;
    private readonly int maxItems;
    private volatile int pushIncrement;
    private volatile int popIncrement;
    private Node firstNode = new Node();
    private Node lastNode;

    public CustomQueue(int maxItems)
    {
        this.maxItems = maxItems;
        this.lastNode = this.firstNode;
        this.queueSemaphore = new Semaphore(0, this.maxItems);
    }

    public int Size
    {
        get
        {
            return this.pushIncrement - this.popIncrement;
        }
    }

    public bool Push(T value)
    {
        lock (this.pushLocker)
        {
            if (this.Size >= this.maxItems)
            {
                lock (this.popLocker)
                {
                    this.pushIncrement = this.pushIncrement - this.popIncrement;
                    this.popIncrement = 0;
                    return false;
                }
            }

            Node newNode = new Node(value, this.lastNode.NextNode);

            this.lastNode = new Node(this.lastNode.Value, newNode);
            this.firstNode = new Node(null, newNode);
            this.pushIncrement++;
            this.queueSemaphore.Release();
            return true;
        }
    }

    public T Pop()
    {
        this.queueSemaphore.WaitOne();
        lock (this.popLocker)
        {
            Node tempNext = this.firstNode.NextNode;
            T value = tempNext.Value;

            this.firstNode = tempNext;
            this.popIncrement++;
            return value;
        }
    }

    private sealed class Node
    {
        private readonly T value;

        private readonly Node nextNode;

        public Node()
        {
        }

        public Node(T value, Node nextNode)
        {
            this.value = value;
            this.nextNode = nextNode;
        }

        public T Value
        {
            get
            {
                return this.value;
            }
        }

        public Node NextNode
        {
            get
            {
                return this.nextNode;
            }
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...