Включение очереди <T>с параллелизмом - PullRequest
10 голосов
/ 29 декабря 2010

У меня есть предыдущий вопрос , который я предоставил для решения;однако у меня нет доступа к ConcurrentQueue<T>, так как я нахожусь на .Net 3.5.Мне нужно Queue<T>, чтобы разрешить параллелизм.Я прочитал этот вопрос и, кажется, представляет проблему, если элемент не в очереди, а многопоточный метод пытается удалить элемент из очереди.

Моя задача сейчас состоит в том, чтобы определить, могу ли я получить собственный класс параллельной очереди.Вот что я придумал:

public sealed class ConcurrentQueue : Queue<DataTable>
{
    public event EventHandler<TableQueuedEventArgs> TableQueued;
    private ICollection que;

    new public void Enqueue(DataTable Table)
    {
        lock (que.SyncRoot)
        {
            base.Enqueue(Table);
        }

        OnTableQueued(new TableQueuedEventArgs(Dequeue()));
    }

    //  this is where I think I will have a problem...
    new public DataTable Dequeue()
    {
        DataTable table;

        lock (que.SyncRoot)
        {
            table = base.Dequeue();
        }

        return table;
    }

    public void OnTableQueued(TableQueuedEventArgs table)
    {
        EventHandler<TableQueuedEventArgs> handler = TableQueued;

        if (handler != null)
        {
            handler(this, table);
        }
    }
}

Итак, когда DataTable ставится в очередь, EventArgs будет передавать таблицу из очереди в очередь подписчику события.Будет ли эта реализация предоставлять мне потокобезопасную очередь?

Ответы [ 5 ]

3 голосов
/ 29 декабря 2010

Тот факт, что вам нужно использовать new, чтобы скрыть методы от базового класса, обычно указывает на то, что вы должны использовать композицию, а не наследование ...

Вот простая синхронизированная очередь, которая не использует наследование, но все еще зависит от поведения стандарта Queue<T>:

public class ConcurrentQueue<T> : ICollection, IEnumerable<T>
{
    private readonly Queue<T> _queue;

    public ConcurrentQueue()
    {
        _queue = new Queue<T>();
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (SyncRoot)
        {
            foreach (var item in _queue)
            {
                yield return item;
            }
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public void CopyTo(Array array, int index)
    {
        lock (SyncRoot)
        {
            ((ICollection)_queue).CopyTo(array, index);
        }
    }

    public int Count
    {
        get
        { 
            // Assumed to be atomic, so locking is unnecessary
            return _queue.Count;
        }
    }

    public object SyncRoot
    {
        get { return ((ICollection)_queue).SyncRoot; }
    }

    public bool IsSynchronized
    {
        get { return true; }
    }

    public void Enqueue(T item)
    {
        lock (SyncRoot)
        {
            _queue.Enqueue(item);
        }
    }

    public T Dequeue()
    {
        lock(SyncRoot)
        {
            return _queue.Dequeue();
        }
    }

    public T Peek()
    {
        lock (SyncRoot)
        {
            return _queue.Peek();
        }
    }

    public void Clear()
    {
        lock (SyncRoot)
        {
            _queue.Clear();
        }
    }
}
3 голосов
/ 29 декабря 2010

Быстрое путешествие в мою любимую поисковую систему показало, что моя память верна; вы можете получить библиотеку параллелей задач даже в .NET 3.5 .Также см. Сообщение в блоге команды PFX на тему и Реактивные расширения , которые вы загружаете, чтобы получить желаемую System.Threading.dll.

2 голосов
/ 09 декабря 2011

Через некоторое время после первоначального вопроса я знаю (это прозвучало как «связанный» справа от другого вопроса), но в подобных случаях я ушел со следующим.Не так хорошо для использования кэша ЦП, как могло бы быть, но простое, без блокировок, поточно-ориентированное и часто использование кэша ЦП не так важно, если бы между операциями часто возникали большие разрывыне близость распределения может уменьшить влияние:

internal sealed class LockFreeQueue<T>
{
  private sealed class Node
  {
    public readonly T Item;
    public Node Next;
    public Node(T item)
    {
      Item = item;
    }
  }
  private volatile Node _head;
  private volatile Node _tail;
  public LockFreeQueue()
  {
    _head = _tail = new Node(default(T));
  }
#pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked
  public void Enqueue(T item)
  {
    Node newNode = new Node(item);
    for(;;)
    {
      Node curTail = _tail;
      if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)   //append to the tail if it is indeed the tail.
      {
        Interlocked.CompareExchange(ref _tail, newNode, curTail);   //CAS in case we were assisted by an obstructed thread.
        return;
      }
      else
      {
        Interlocked.CompareExchange(ref _tail, curTail.Next, curTail);  //assist obstructing thread.
      }
    }
  }    
  public bool TryDequeue(out T item)
  {
    for(;;)
    {
      Node curHead = _head;
      Node curTail = _tail;
      Node curHeadNext = curHead.Next;
      if (curHead == curTail)
      {
        if (curHeadNext == null)
        {
          item = default(T);
          return false;
        }
        else
          Interlocked.CompareExchange(ref _tail, curHeadNext, curTail);   // assist obstructing thread
      }
      else
      {
        item = curHeadNext.Item;
        if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
        {
          return true;
        }
      }
    }
  }
#pragma warning restore 420
}
2 голосов
/ 29 декабря 2010

Вы удаляете свои предметы из очереди, когда ставите их в очередь.
Вам необходимо поднять событие, используя ваш параметр.

От того, как вы его используете, зависит, насколько оно поточно-ориентированное.
Есливы когда-нибудь проверяли Count или проверяли на пустоту, он не потокобезопасен и не может быть легко сделан потокобезопасным.
Если нет, вы, вероятно, можете использовать что-то более простое, чем очередь.

0 голосов
/ 29 декабря 2010

В строке OnTableQueued(new TableQueuedEventArgs(Dequeue())); в вашем Enqueue методе

используйте Peek вместо Dequeue

Это должно быть

OnTableQueued(new TableQueuedEventArgs(base.Peek()));

...