Производительность BlockingCollection (T) - PullRequest
22 голосов
/ 14 июня 2010

Некоторое время в моей компании мы использовали доморощенную реализацию ObjectPool<T>, которая обеспечивает блокировку доступа к ее содержимому.Это довольно просто: Queue<T>, object для блокировки и AutoResetEvent для сигнализации «заимствованному» потоку при добавлении элемента.

Мясо класса действительно таководва метода:

public T Borrow() {
    lock (_queueLock) {
        if (_queue.Count > 0)
            return _queue.Dequeue();
    }

    _objectAvailableEvent.WaitOne();

    return Borrow();
}

public void Return(T obj) {
    lock (_queueLock) {
        _queue.Enqueue(obj);
    }

    _objectAvailableEvent.Set();
}

Мы использовали этот и несколько других классов коллекций вместо тех, которые предоставлены System.Collections.Concurrent, потому что мы используем .NET 3.5, а не 4.0.Но недавно мы обнаружили, что, поскольку мы используем Reactive Extensions , на самом деле do имеет доступное нам пространство имен Concurrent (в System.Threading.dll).

Естественно, я решил, что, поскольку BlockingCollection<T> является одним из базовых классов в пространстве имен Concurrent, он, вероятно, будет предлагать лучшую производительность, чем все, что я или мои товарищи по команде написали.

ИтакЯ попытался написать новую реализацию, которая работает очень просто:

public T Borrow() {
    return _blockingCollection.Take();
}

public void Return(T obj) {
    _blockingCollection.Add(obj);
}

К моему удивлению, согласно некоторым простым тестам (заимствование / возврат в пул несколько тысяч раз из нескольких потоков), наша первоначальная реализация значительно превосходит BlockingCollection<T> с точки зрения производительности .Похоже, они оба работают правильно ;просто наша первоначальная реализация выглядит намного быстрее.

Мой вопрос:

  1. С чего бы это?Возможно, это связано с тем, что BlockingCollection<T> обеспечивает большую гибкость (насколько я понимаю, он работает, оборачивая IProducerConsumerCollection<T>), что обязательно приводит к снижению производительности?
  2. Это просто неуместное неправильное использованиекласса BlockingCollection<T>?
  3. Если это правильное использование BlockingCollection<T>, я просто не правильно использую?Например, является ли Take / Add подход слишком упрощенным, и есть гораздо более эффективный способ получить ту же функциональность?

Разве у кого-то есть что-то, что можно предложить в ответ на это?Третий вопрос. Похоже, что сейчас мы будем придерживаться нашей первоначальной реализации.

Ответы [ 3 ]

26 голосов
/ 14 июня 2010

Здесь есть несколько потенциальных возможностей.

Во-первых, BlockingCollection<T> в Reactive Extensions является обратным портом и не совсем совпадает с финальной версией .NET 4. Я не удивлюсь, если производительность этого бэкпорта отличается от .NET 4 RTM (хотя я специально не представлял эту коллекцию). Большая часть TPL работает лучше в .NET 4, чем в .NET 3.5 backport.

При этом я подозреваю, что ваша реализация превзойдет BlockingCollection<T>, если у вас будет один поток производителя и один поток потребителя. С одним производителем и одним потребителем ваша блокировка будет оказывать меньшее влияние на общую производительность, а событие сброса является очень эффективным средством ожидания на стороне потребителя.

Однако, BlockingCollection<T> разработан, чтобы позволить многим потокам производителя очень хорошо «ставить» в очередь данные. Это не будет хорошо работать с вашей реализацией, так как конфликт блокировок станет довольно быстро проблематичным.

При этом я также хотел бы указать на одно заблуждение:

... это, вероятно, даст лучшую производительность, чем все, что я или мои товарищи по команде написали.

Это часто не так. Классы коллекции каркасов обычно работают очень хорошо , но часто не являются наиболее производительным вариантом для данного сценария. При этом они, как правило, хорошо выступают, будучи очень гибкими и очень крепкими. Они часто имеют тенденцию очень хорошо масштабироваться. «Домашние» классы коллекций часто превосходят коллекции фреймворков в определенных сценариях, но, как правило, проблематичны при использовании в сценариях, отличных от тех, для которых они были специально разработаны. Я подозреваю, что это одна из тех ситуаций.

11 голосов
/ 26 марта 2015

Я пытался BlockingCollection против ConurrentQueue/AutoResetEvent комбо (аналогично решению OP, но без блокировки) в .Net 4, и последнее комбо было , поэтому намного быстрее для моего варианта использования, что я отказалсяBlockingCollection.К сожалению, это было почти год назад, и я не смог найти результаты теста.

Использование отдельного AutoResetEvent не слишком усложняет ситуацию.Фактически, можно даже раз и навсегда абстрагировать его в BlockingCollectionSlim ....

BlockingCollection внутренне полагается также на ConcurrentQueue, но делает некоторое дополнительное жонглированиес тонкими семафорами и токенами аннулирования , что дает дополнительные функции, но за дополнительную плату, даже если не используется.Следует также отметить, что BlockingCollection не состоит в браке с ConcurrentQueue, но вместо этого может использоваться с другими реализациями IProducerConsumerCollection.


Неограниченная, довольно голая реализация BlockingCollectionSlim:

class BlockingCollectionSlim<T>
{
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    public void Add(T item)
    {
        _queue.Enqueue(item);
        _autoResetEvent.Set();
    }
    public bool TryPeek(out T result)
    {
        return _queue.TryPeek(out result);
    }
    public T Take()
    {
        T item;
        while (!_queue.TryDequeue(out item))
            _autoResetEvent.WaitOne();
        return item;
    }
    public bool TryTake(out T item, TimeSpan patience)
    {
        if (_queue.TryDequeue(out item))
            return true;
        var stopwatch = Stopwatch.StartNew();
        while (stopwatch.Elapsed < patience)
        {
            if (_queue.TryDequeue(out item))
                return true;
            var patienceLeft = (patience - stopwatch.Elapsed);
            if (patienceLeft <= TimeSpan.Zero)
                break;
            else if (patienceLeft < MinWait)
            // otherwise the while loop will degenerate into a busy loop,
            // for the last millisecond before patience runs out
                patienceLeft = MinWait;
            _autoResetEvent.WaitOne(patienceLeft);
        }
        return false;
    }
    private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);
1 голос
/ 01 января 2019

Я столкнулся с теми же проблемами с производительностью BlockingCollection в .Net 4.7.2 и нашел этот пост.В моем случае это MultipleProducers-MultipleConsumers, в частности небольшие фрагменты данных считываются из многих источников и должны обрабатываться многими фильтрами.Было использовано несколько (Env.ProcessorCount) BlockingCollections, и я получил профилировщик производительности, который сказал мне, что BlockingCollection.GetConsumingEnumerable.MoveNext() потребляет больше процессорного времени, чем фактическая фильтрация!

Спасибо, @ Евгений Бересовский, за ваш код.К вашему сведению: в моей среде это было почти вдвое медленнее, чем BlockingCollection.Итак, вот моя SpinLocked BlockingCollection:

public class BlockingCollectionSpin<T>
{
    private SpinLock _lock = new SpinLock(false);
    private Queue<T> _queue = new Queue<T>();

    public void Add(T item)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            _queue.Enqueue(item);
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public bool TryPeek(out T result)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            if (_queue.Count > 0)
            {
                result = _queue.Peek();
                return true;
            }
            else
            {
                result = default(T);
                return false;
            }
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public T Take()
    {
        var spin = new SpinWait();
        do
        {
            bool gotLock = false;
            try
            {
                _lock.Enter(ref gotLock);
                if (_queue.Count > 0)
                    return _queue.Dequeue();
            }
            finally
            {
                if (gotLock) _lock.Exit(false);
            }
            spin.SpinOnce();
        } while (true);
    }
}

А для кода, критичного к производительности, я бы рекомендовал избегать readonly модификатора поля.Он добавляет проверку каждого поля доступа в IL.Со следующим тестовым кодом

private static void TestBlockingCollections()
{
    const int workAmount = 10000000;
    var workerCount = Environment.ProcessorCount * 2;
    var sw = new Stopwatch();
    var source = new long[workAmount];
    var rnd = new Random();
    for (int i = 0; i < workAmount; i++)
        source[i] = rnd.Next(1000000);

    var swOverhead = 0.0;
    for (int i = 0; i < workAmount; i++)
    {
        sw.Restart();
        swOverhead += sw.Elapsed.TotalMilliseconds;
    }
    swOverhead /= workAmount;

    var sum1 = new long[workerCount];
    var queue1 = new BlockingCollection<long>(10000);
    var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        foreach (var l in queue1.GetConsumingEnumerable())
            sum1[n] += l;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue1.Add(l);
    queue1.CompleteAdding();
    Task.WaitAll(workers);
    var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);

    var sum2 = new long[workerCount];
    var queue2 = new BlockingCollectionSlim<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue2.Take()).HasValue)
            sum2[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue2.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue2.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);

    var sum3 = new long[workerCount];
    var queue3 = new BlockingCollectionSpin<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue3.Take()).HasValue)
            sum3[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue3.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue3.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);

    if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
        Console.WriteLine("Wrong sum in the end!");

    Console.ReadLine();
}

На Core i5-3210M с 2 ядрами и включенным HT я получил следующий вывод:

BlockingCollection     0.0006ms
BlockingCollectionSlim 0.0010ms (Eugene Beresovsky implementation)
BlockingCollectionSpin 0.0003ms

Итак, версия SpinLocked в два раза быстрее.Net BlockingCollection.Но я бы предложил использовать только его!если вы действительно предпочитаете производительность, а не простоту кода (и удобство сопровождения).

...