Используя BlockingCollection <>: OperationCanceledException, есть ли лучший способ? - PullRequest
29 голосов
/ 21 января 2012

Я использую (откровенно великолепный) тип BlockingCollection<> для многопоточного, высокопроизводительного приложения.

Существует большая пропускная способность при сборе, а на микроуровне это оченьпроизводительный.Тем не менее, для каждого «пакета» он всегда будет заканчиваться пометкой токена отмены.Это приводит к возникновению исключения при любом ожидающем вызове Take.Это нормально, но я бы согласился на возвращаемое значение или выходной параметр, чтобы сигнализировать об этом, потому что a) исключения имеют очевидные накладные расходы и b) при отладке я не хочу вручную отключать прерывание по исключению для этого конкретногоисключение.

Реализация кажется интенсивной, и в теории, я полагаю, я мог бы разобрать и воссоздать свою собственную версию, в которой не использовались исключения, но, возможно, есть менее сложный способ?

Я мог бы добавитьnull (или, если нет, объект-заполнитель) объекта в коллекции, указывающий на то, что процесс должен завершиться, однако также должен быть способ красиво прервать, то есть разбудить ожидающие потоки и как-то сказать им, что что-то произошло.

Итак - альтернативные типы коллекций?Воссоздать мой собственный?Какой-то способ злоупотребить этим?

(Некоторый контекст: я пошел с BlockingCollection<>, потому что он имеет преимущество перед ручной блокировкой вокруг Queue. Как я могу сказать, использование потоковых примитивов превосходно ив моем случае, несколько миллисекунд здесь и там, и оптимальное ядро ​​очень важно использовать.)

Редактировать: я только что открыл награду за это. Я не верю, что ответ Анастасиосиала покрывает вопрос, который я поднимаю в своем комментарии к нему .Я знаю, что это сложная проблема.Кто-нибудь может помочь?

Ответы [ 4 ]

9 голосов
/ 26 января 2012

Как я полагаю, вы уже сделали это самостоятельно, глядя на отраженный источник BlockingCollection, к сожалению, выглядит так, что, когда CancellationToken передается в BlockingCollection и он отменяется, вы получите исключение OperationCancelledException, как видно на рисунке ниже (спара обходных решений после изображения)

GetConsumingEnumerable вызывает TryTakeWithNoTimeValidation для BlockingCollection, что, в свою очередь, вызывает это исключение.

enter image description here

Обходной путь # 1

Одной из возможных стратегий будет, если вы будете иметь больше контроля над своими производителями и потребителями, чемпередав токен отмены в коллекцию BlockingCollection (что вызовет это исключение), вы передаете токен отмены своим производителям и своим потребителям.

Если ваши производители не производят, а ваши потребители не потребляют, товы фактически отменили операцию, не вызывая это исключение и передав CancellationToken.None в BlockingCollection.

Особые случаи Отмена, когда BlockingCollection имеет значение BoundedCapacity или Empty

Производители заблокированы : Потоки производителей будут заблокированы, когда BoundedCapacity на BlockingCollection имеет значениедостиг.Следовательно, при попытке отмены, когда BlockingCollection имеет значение BoundedCapacity (что означает, что ваши потребители не заблокированы, а производители заблокированы, поскольку они не могут добавлять какие-либо дополнительные элементы в очередь), вам потребуется разрешить использование дополнительных элементов (одиндля каждого потока производителей), который разблокирует производителей (поскольку они блокируются при добавлении в blockingCollection) и, в свою очередь, позволяет логике отмены срабатывать на стороне производителя.

Потребители заблокированы : Когда ваши потребители заблокированы из-за того, что очередь пуста, вы можете вставить пустую единицу работы (по одной для каждого потока потребителя) в коллекцию блокировки, чтобы разблокироватьпотребительские потоки и позволяют логике отмены работать на стороне потребителя.

Если в очереди есть элементы и не достигнут предел, такой как BoundedCapacity или Empty, то потоки производителей и потребителей не должны блокироваться.

Обходной путь # 2

Использование единицы отмены работы.

Когда ваше приложение необходимо отменить, тогда ваши производители (может быть, достаточно одного производителя, в то время как другие просто отменяют производство) будут производить единицу работы отмены (может быть нулевой, как вы также упомянули или какой-то класс, который реализует).интерфейс маркера).Когда потребители потребляют эту единицу работы и обнаруживают, что это на самом деле единица работы отмены, их логика отмены срабатывает. Количество единиц отмены работы, которая должна быть произведена, должно равняться количеству потоков потребителя.

Опять же, нужно быть осторожным, когда мы приближаемся к BoundedCapacity, поскольку это может быть признаком того, что некоторые производители заблокированы.В зависимости от количества производителей / потребителей вы можете потреблять потребителя, пока все производители (кроме 1) не отключатся.Это гарантирует, что вокруг не останется ни одного производителя.Когда остается только 1 производитель, ваш последний потребитель может отключиться, и производитель может прекратить производство единиц отмены работы.

1 голос
/ 20 июня 2012

Kieren,

Из моего осмотра я лично не знаю ни одного потокаобезопасного типа для шаблона ProducerConsumer, который бы выполнял именно то, что вы хотели.Я не претендую на это как на конкурентное решение, но предлагаю вам украсить BlockingCollection<T> несколькими extension method, что даст вам свободу предоставлять любые встроенные или пользовательские типы вместо значений по умолчанию CancellationToken.

Этап 1:

Ниже приведен список метода по умолчанию, который использует метод подчеркивания TryAddWithNoTimeValidation для добавления в очередь.

public void Add(T item){
      this.TryAddWithNoTimeValidation(item, -1, new CancellationToken());
}

public void Add(T item, CancellationToken cancellationToken){
      this.TryAddWithNoTimeValidation(item, -1, cancellationToken);
    }

public bool TryAdd(T item){
      return this.TryAddWithNoTimeValidation(item, 0, new CancellationToken());
    }

public bool TryAdd(T item, TimeSpan timeout){
      BlockingCollection<T>.ValidateTimeout(timeout);
      return this.TryAddWithNoTimeValidation(item, (int) timeout.TotalMilliseconds, new CancellationToken());
    }

public bool TryAdd(T item, int millisecondsTimeout){
      BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout);
      return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, new           CancellationToken());
}

public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken){
 BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout);
 return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken);
}

Теперь вы можете предоставить расширение для любого /все методы, которые вас интересуют.

Этап 2:

Теперь вы ссылаетесь на вашу реализацию TryAddWithNoTimeValidation вместо значения по умолчанию.

Я могупредоставим вам альтернативную версию TryAddWithNoTimeValidation, которая безопасно продолжится без исключения OperationCancellation.

1 голос
/ 19 июня 2012

Вы можете подать сигнал об окончании партии, установив флаг для последнего элемента (добавьте в него свойство bool IsLastItem или оберните его). Или вы можете отправить ноль как последний элемент (хотя не уверен, что ноль правильно проходит через блокирующий набор).

Если вы можете устранить необходимость в концепции «пакета», вы можете создать дополнительный поток, чтобы непрерывно брать () и обрабатывать новые данные из вашей блокирующей коллекции, и больше ничего не делать.

1 голос
/ 16 июня 2012

Как насчет BlockingQueue, который я делал недавно?

http://apichange.codeplex.com/SourceControl/changeset/view/76c98b8c7311#ApiChange.Api%2fsrc%2fInfrastructure%2fBlockingQueue.cs

Все должно работать без исключений.Текущая очередь просто закрывает событие при утилизации, что может быть не тем, что вы хотите.Вы можете захотеть сделать enque null и подождать, пока все элементы не будут обработаны.Помимо этого он должен удовлетворить ваши потребности.

using System.Collections.Generic;
using System.Collections;
using System.Threading;
using System;

namespace ApiChange.Infrastructure
{

    /// <summary>
    /// A blocking queue which supports end markers to signal that no more work is left by inserting
    /// a null reference. This constrains the queue to reference types only. 
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class BlockingQueue<T> : IEnumerable<T>, IEnumerable, IDisposable where T : class
    {
        /// <summary>
        /// The queue used to store the elements
        /// </summary>
        private Queue<T> myQueue = new Queue<T>();
        bool myAllItemsProcessed = false;
        ManualResetEvent myEmptyEvent = new ManualResetEvent(false);

        /// <summary>
        /// Deques an element from the queue and returns it.
        /// If the queue is empty the thread will block. If the queue is stopped it will immedieately
        /// return with null.
        /// </summary>
        /// <returns>An object of type T</returns>      
        public T Dequeue()
        {
            if (myAllItemsProcessed)
                return null;

            lock (myQueue)
            {
                while (myQueue.Count == 0) 
                {
                    if(!Monitor.Wait(myQueue, 45))
                    {
                        // dispatch any work which is not done yet
                        if( myQueue.Count > 0 )
                            continue;
                    }

                    // finito 
                    if (myAllItemsProcessed)
                    {
                        return null;
                    }
                }

                T result = myQueue.Dequeue();
                if (result == null)
                {
                    myAllItemsProcessed = true;
                    myEmptyEvent.Set();
                }
                return result;
            }
        }

        /// <summary>
        /// Releases the waiters by enqueuing a null reference which causes all waiters to be released. 
        /// The will then get a null reference as queued element to signal that they should terminate.
        /// </summary>
        public void ReleaseWaiters()
        {
            Enqueue(null);
        }

        /// <summary>
        /// Waits the until empty. This does not mean that all items are already process. Only that
        /// the queue contains no more pending work. 
        /// </summary>
        public void WaitUntilEmpty()
        {
            myEmptyEvent.WaitOne();
        }

        /// <summary>
        /// Adds an element of type T to the queue. 
        /// The consumer thread is notified (if waiting)
        /// </summary>
        /// <param name="data_in">An object of type T</param>      
        public void Enqueue(T data_in)
        {
            lock (myQueue)
            {
                myQueue.Enqueue(data_in);
                Monitor.PulseAll(myQueue);
            }
        }

        /// <summary>
        /// Returns an IEnumerator of Type T for this queue
        /// </summary>
        /// <returns></returns>    
        IEnumerator<T> IEnumerable<T>.GetEnumerator()
        {
            while (true)
            {
                T item = Dequeue();
                if (item == null)
                    break;
                else
                    yield return item;
            }
        }

        /// <summary>
        /// Returns a untyped IEnumerator for this queue
        /// </summary>
        /// <returns></returns>     
        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable<T>)this).GetEnumerator();
        }


        #region IDisposable Members

        /// <summary>
        /// Closes the EmptyEvent WaitHandle.
        /// </summary>
        public void Dispose()
        {
            myEmptyEvent.Close();
        }

        #endregion
    }
}
...