мышление в очереди сообщений - PullRequest
5 голосов
/ 18 января 2011

Мы реализовали очередь сообщений с помощью C # Queue.Мы знаем, что у нас только ОДИН потребитель, который может вывести доступное сообщение из очереди для обработки с циклом while.Мы также знаем, что есть только ОДИН производитель, чтобы поместить сообщение в очередь.

У нас есть lock в указанной выше очереди сообщений, чтобы гарантировать, что потребитель и производитель не могут получить доступ к очереди одновременно.

Мой вопрос: lock необходимо?Если Queue увеличивает свое свойство Count ПОСЛЕ , элемент фактически добавляется, и если потребитель проверяет Count перед извлечением, потребитель должен получить полный элемент сообщения, даже если у нас его нетlock.Правильно?Поэтому мы не столкнемся с частичной проблемой сообщения.Тогда мы можем избавиться от этого lock?

Это lock замедлит работу системы, и иногда мы можем увидеть, что поток извлечения заблокирован на некоторое время, потому что у нас очень тяжелый производитель.

РЕДАКТИРОВАТЬ:

К сожалению, мы используем .Net 3.5.

Ответы [ 7 ]

10 голосов
/ 18 января 2011

Реальная проблема заключается в том, что внутренние структуры данных очереди могут быть видоизменены, пока возникает очередь или очередь, и в течение этого периода структура данных находится в неопределенном состоянии с точки зрения другого потока.

Например, для очереди может потребоваться расширение внутренних структур данных, когда создается новая структура, а старые элементы копируются из старой структуры в новую структуру.В этом процессе будет много этапов, когда в любой момент другому потоку будет опасно получить доступ к очереди, поскольку операции не завершены.

Поэтому во время постановки в очередь \ удаления вы должны заблокировать, чтобы эти операции выглядели логически атомарно.

Вы можете попробовать новый класс ConcurrentQueue в .Net 4.0возможно, он имеет лучшие характеристики производительности, поскольку использует алгоритм без блокировки.

4 голосов
/ 18 января 2011

Блокировка необходима, если вы используете Queue<T>.Вы можете легко удалить это, заменив это на ConcurrentQueue<T>, однако, вы можете подумать об упрощении этого кода, заменив его на BlockingCollection<T>.

Это позволит вашему потребителюснять блокировку и при проверке, и просто сделать один foreach на collection.GetConsumingEnumerable().Производитель может снять блокировку и добавить предметы по мере необходимости.Это также позволит вам легко использовать несколько производителей, поскольку вы упомянули, что у вас сейчас «очень тяжелый производитель».

2 голосов
/ 18 января 2011
1 голос
/ 19 января 2011

Кстати, очереди без блокировок для одного читателя и одного писателя довольно легко написать.Это очень примитивный пример концепции, но он выполняет свою работу:

class LocklessQueue<T>
{
    class Item
    {
        public Item Next;
        bool _valid;
        T _value;
        public Item(bool valid, T value)
        {
            _valid = valid;
            _value = value;
            Next = null;
        }
        public bool IsValid { get { return _valid; } }
        public T TakeValue()
        {
            T value = _value;
            _valid = false;
            _value = default(T);
            return value;
        }
    }

    Item _first;
    Item _last;

    public LocklessQueue()
    {
        _first = _last = new Item(false, default(T));
    }

    public bool IsEmpty
    { 
        get
        {
            while (!_first.IsValid && _first.Next != null)
                _first = _first.Next;
            return false == _first.IsValid;
        }
    }

    public void Enqueue(T value)
    {
        Item i = new Item(true, value);
        _last.Next = i;
        _last = i;
    }

    public T Dequeue()
    {
        while (!_first.IsValid && _first.Next != null)
            _first = _first.Next;

        if (IsEmpty)
            throw new InvalidOperationException();//queue is empty

        return _first.TakeValue();
    }
}
1 голос
/ 18 января 2011

Нет, это не будет работать последовательно ... почему?

Давайте разберем два метода, которые мы будем вызывать одновременно из двух потоков (один для чтения и один для записи):

public T Dequeue()
{
    if (this._size == 0)
    {
        ThrowHelper.ThrowInvalidOperationException(ExceptionResource.InvalidOperation_EmptyQueue);
    }
    T local = this._array[this._head];
    this._array[this._head] = default(T);
    this._head = (this._head + 1) % this._array.Length;
    this._size--;
    this._version++;
    return local;
}

public void Enqueue(T item)
{
    if (this._size == this._array.Length)
    {
        int capacity = (int) ((this._array.Length * 200L) / 100L);
        if (capacity < (this._array.Length + 4))
        {
            capacity = this._array.Length + 4;
        }
        this.SetCapacity(capacity);
    }
    this._array[this._tail] = item;
    this._tail = (this._tail + 1) % this._array.Length;
    this._size++;
    this._version++;
}

Учитывая приведенный выше код, три переменные являются безопасными, если (и только если) имеется достаточная емкость в очереди.поля для _array, _head и _tail либо не изменены, либо изменены только одним из двух указанных выше методов.

Причина, по которой вы не можете удалить lock (), заключается в том, что оба метода изменяют _size и _version.Хотя, возможно, столкновение с _version можно игнорировать, столкновение с _size вызовет нежелательное и непредсказуемое поведение.

0 голосов
/ 18 января 2011

ConcurrentQueue доступно, даже если вы используете .NET 3.5. Reactive Extensions включает в себя то, что раньше было параллельными расширениями до .NET 3.5 - предшественник параллельной библиотеки задач, входящей в .NET 4.0.

0 голосов
/ 18 января 2011

Вы должны быть Locking; класс не является потокобезопасным. Если вы используете Queue в System.Collections, есть поточный Queue удобный (System.Collections.Queue.Synchronized() возвращает такое Queue) В противном случае обязательно используйте предоставленный объект Queue<T>.SyncRoot для синхронизации:

using System.Collections.Generic;
public static class Q_Example
{
    private readonly Queue<int> q = new Queue<int>();
    public void Method1(int val)
    {
        lock(q.SyncRoot)
        {
            q.EnQueue(val);
        }
    }

    public int Method2()
    {
        lock(q.SyncRoot)
        {
            return q.Dequeue();
        }
    }
}
...