Атомно все берет из ConcurrentQueue - PullRequest
0 голосов
/ 11 февраля 2020

У меня есть несколько потоков, генерирующих элементы и вставляющих их в общий ConcurrentQueue:

private ConcurrentQueue<GeneratedItem> queuedItems = new ConcurrentQueue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    queuedItems.Enqueue(new GeneratedItem(...));
    // ...
}

У меня есть еще один отдельный потребительский поток, но время от времени он должен работать в контексте этого приложения, иногда , ему просто нужно захватить все , находящееся в данный момент в очереди потоков, удалив его из этой очереди, все за один выстрел. Что-то вроде:

private Queue<GeneratedItem> GetAllNewItems () {

    return queuedItems.TakeEverything(); // <-- not a real method

}

Я думаю, что я просмотрел всю документацию (для коллекции и ее реализованных интерфейсов), но я не нашел ничего похожего на «одновременно отбирать все объекты из очереди», или даже «одновременно поменять содержимое с другой очередью».

Я мог бы сделать это без проблем, если бы угробил ConcurrentQueue и просто защитил нормальный Queue с помощью lock, например:

private Queue<GeneratedItem> queuedItems = new Queue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    lock (queuedItems) {
        queuedItems.Enqueue(new GeneratedItem(...));
    }
    // ...
}

private Queue<GeneratedItem> GetAllNewItems () {

    lock (queuedItems) {
        Queue<GeneratedItem> newItems = new Queue<Event>(queuedItems);
        queuedItems.Clear();
        return newItems;
    }

}

Но мне нравится удобство ConcurrentQueue, а также, поскольку я только учусь C#, мне интересно узнать об API; поэтому мой вопрос: есть ли способ сделать это с одной из одновременных коллекций?

Возможно, есть какой-нибудь способ получить доступ к любому объекту синхронизации, используемому ConcurrentQueue, и заблокировать его для себя в своих целях, чтобы все хорошо играет вместе? Тогда я могу заблокировать его, взять все и отпустить?

Ответы [ 2 ]

2 голосов
/ 11 февраля 2020

Это зависит от того, что вы хотите сделать. Согласно комментариям в исходном коде

//number of snapshot takers, GetEnumerator(), ToList() and ToArray() operations take snapshot.

Это работает внутренним вызовом ToList () , который, в свою очередь, работает на m_numSnapshotTakers и spin механизм

/// Copies the <see cref="ConcurrentQueue{T}"/> elements to a new <see
/// cref="T:System.Collections.Generic.List{T}"/>.
/// </summary>
/// <returns>A new <see cref="T:System.Collections.Generic.List{T}"/> containing a snapshot of
/// elements copied from the <see cref="ConcurrentQueue{T}"/>.</returns>
private List<T> ToList()
{
   // Increments the number of active snapshot takers. This increment must happen before the snapshot is 
   // taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it
   // eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0. 
   Interlocked.Increment(ref m_numSnapshotTakers);

   List<T> list = new List<T>();
   try
   {
       //store head and tail positions in buffer, 
       Segment head, tail;
       int headLow, tailHigh;
       GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);

       if (head == tail)
       {
           head.AddToList(list, headLow, tailHigh);
       }
       else
       {
           head.AddToList(list, headLow, SEGMENT_SIZE - 1);
           Segment curr = head.Next;
           while (curr != tail)
           {
               curr.AddToList(list, 0, SEGMENT_SIZE - 1);
               curr = curr.Next;
           }
           //Add tail segment
           tail.AddToList(list, 0, tailHigh);
       }
   }
   finally
   {
       // This Decrement must happen after copying is over. 
       Interlocked.Decrement(ref m_numSnapshotTakers);
   }
   return list;
}

Если снимок - это все, что вам нужно, то вам повезло. Однако, по-видимому, нет встроенного способа получить и удалить все элементы из ConcurrentQueue потокобезопасным способом. Вам нужно будет испечь собственную синхронизацию, используя lock или аналогичный. Или сверните свое собственное (что может быть не так уж сложно, глядя на источник).

1 голос
/ 11 февраля 2020

Такого метода не существует, потому что то, что на самом деле TakeEverything делает неоднозначным:

  1. Возьмите элемент за элементом, пока очередь не опустеет, а затем верните взятые элементы.
  2. Заблокируйте полный доступ к очереди, сделайте снимок (все элементы в al oop) = очистите очередь, разблокируйте, верните снимок.

Рассмотрите первый сценарий и представьте, что другие потоки пишут в очередь в то время, когда вы удаляете элементы по очереди из очереди - должен ли TakeEverything метод включать их в результат?

Если да, то вы можете просто записать его как:

public List<GeneratedItem> TakeEverything()
{
    var list = new List<GeneratedItem>();

    while (queuedItems.TryDequeue(out var item))
    {
        list.Add(item);
    }

    return list;
}

Если нет, тогда я все равно использовал бы ConcurrentQueue (, потому что все члены экземпляра - методы и свойства - из обычного Queue не являются поточно-ориентированными ) и реализовал бы пользовательскую блокировку для каждого чтения / записи доступ, так что вы убедитесь, что вы не добавляете элементы, а "отбираете все" из очереди.

...