Как ускорить объемную реализацию BlockingCollection - PullRequest
0 голосов
/ 06 июля 2019

Я много раз использовал BlockingCollection для реализации шаблона производитель / потребитель, но у меня была плохая производительность с чрезвычайно детализированными данными из-за связанных издержек.Обычно это вынуждает меня импровизировать, разбивая на части / разбивая мои данные, другими словами, используя BlockingCollection<T[]> вместо BlockingCollection<T>.Вот последний пример .Это работает, но уродливо и подвержено ошибкам.Я заканчиваю тем, что использую вложенные циклы как у производителя, так и у потребителя, и я должен помнить Add, что остается в конце рабочей нагрузки производителя.Таким образом, у меня была идея реализовать коренастый BlockingCollection, который будет обрабатывать все эти сложности внутренне и выведет тот же простой интерфейс с существующим BlockingCollection.Моя проблема в том, что мне пока не удалось сопоставить производительность сложного ручного разбиения.Моя лучшая попытка все еще платит налог производительности около + 100%, для чрезвычайно детализированных данных (в основном только целочисленные значения).Поэтому я хотел бы представить здесь то, что я сделал до сих пор, в надежде получить совет, который поможет мне сократить разрыв в производительности.

Моя лучшая попытка - использовать ThreadLocal<List<T>>, поэтомучто каждый поток работает на выделенном чанке, устраняя необходимость в блокировках.

public class ChunkyBlockingCollection1<T>
{
    private readonly BlockingCollection<T[]> _blockingCollection;
    public readonly int _chunkSize;
    private readonly ThreadLocal<List<T>> _chunk;

    public ChunkyBlockingCollection1(int chunkSize)
    {
        _blockingCollection = new BlockingCollection<T[]>();
        _chunkSize = chunkSize;
        _chunk = new ThreadLocal<List<T>>(() => new List<T>(chunkSize), true);
    }

    public void Add(T item)
    {
        var chunk = _chunk.Value;
        chunk.Add(item);
        if (chunk.Count >= _chunkSize)
        {
            _blockingCollection.Add(chunk.ToArray());
            chunk.Clear();
        }
    }

    public void CompleteAdding()
    {
        var chunks = _chunk.Values.ToArray();
        foreach (var chunk in chunks)
        {
            _blockingCollection.Add(chunk.ToArray());
            chunk.Clear();
        }
        _blockingCollection.CompleteAdding();
    }

    public IEnumerable<T> GetConsumingEnumerable()
    {
        foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
        {
            for (int i = 0; i < chunk.Length; i++)
            {
                yield return chunk[i];
            }
        }
    }
}

Моя вторая лучшая попытка - использовать один List<T> в качестве чанка, доступ к которому осуществляется всеми потоками безопасным для потока способом, используязамок.Удивительно, но это лишь немного медленнее, чем решение ThreadLocal<List<T>>.

public class ChunkyBlockingCollection2<T>
{
    private readonly BlockingCollection<T[]> _blockingCollection;
    public readonly int _chunkSize;
    private readonly List<T> _chunk;
    private readonly object _locker = new object();

    public ChunkyBlockingCollection2(int chunkSize)
    {
        _blockingCollection = new BlockingCollection<T[]>();
        _chunkSize = chunkSize;
        _chunk = new List<T>(chunkSize);
    }

    public void Add(T item)
    {
        lock (_locker)
        {
            _chunk.Add(item);
            if (_chunk.Count >= _chunkSize)
            {
                _blockingCollection.Add(_chunk.ToArray());
                _chunk.Clear();
            }
        }
    }

    public void CompleteAdding()
    {
        lock (_locker)
        {
            _blockingCollection.Add(_chunk.ToArray());
            _chunk.Clear();
        }
        _blockingCollection.CompleteAdding();
    }

    public IEnumerable<T> GetConsumingEnumerable()
    {
        foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
        {
            for (int i = 0; i < chunk.Length; i++)
            {
                yield return chunk[i];
            }
        }
    }
}

Я также пытался использовать в качестве блока ConcurrentBag<T>, что привело к плохой производительности и проблеме с корректностью (потому что я неиспользовать замок).Другая попытка была заменить lock (_locker) на SpinLock, с еще худшей производительностью.Блокировка - это, очевидно, корень моих проблем, потому что, если я удаляю ее полностью, мой класс достигает оптимальной производительности.Конечно, удаление блокировки с треском проваливается с несколькими производителями.


Обновление: Я реализовал решение без блокировки , предложено , Ник , интенсивно использующий класс Interlocked.В конфигурации с одним производителем производительность немного лучше, но становится намного хуже с двумя или более производителями.Непостоянно много столкновений, которые вызывают вращение потоков.Реализация также очень сложна, что позволяет легко обнаруживать ошибки.

public class ChunkyBlockingCollection3<T>
{
    private readonly BlockingCollection<(T[], int)> _blockingCollection;
    public readonly int _chunkSize;
    private T[] _array;
    private int _arrayCount;
    private int _arrayCountOfCompleted;
    private T[] _emptyArray;

    public ChunkyBlockingCollection3(int chunkSize)
    {
        _chunkSize = chunkSize;
        _blockingCollection = new BlockingCollection<(T[], int)>();
        _array = new T[chunkSize];
        _arrayCount = 0;
        _arrayCountOfCompleted = 0;
        _emptyArray = new T[chunkSize];
    }

    public void Add(T item)
    {
        while (true) // Spin
        {
            int count = _arrayCount;
            while (true) // Spin
            {
                int previous = count;
                count++;
                int result = Interlocked.CompareExchange(ref _arrayCount,
                    count, previous);
                if (result == previous) break;
                count = result;
            }
            var array = Interlocked.CompareExchange(ref _array, null, null);
            if (array == null) throw new InvalidOperationException(
                    "The collection has been marked as complete.");
            if (count <= _chunkSize)
            {
                // There is empty space in the array
                array[count - 1] = item;
                Interlocked.Increment(ref _arrayCountOfCompleted);
                break; // Adding is completed
            }
            if (count == _chunkSize + 1)
            {
                // Array is full. Push it to the BlockingCollection.
                while (Interlocked.CompareExchange(
                    ref _arrayCountOfCompleted, 0, 0) < _chunkSize) { } // Spin
                _blockingCollection.Add((array, _chunkSize));
                T[] newArray;
                while ((newArray = Interlocked.CompareExchange(
                    ref _emptyArray, null, null)) == null) { } // Spin
                Interlocked.Exchange(ref _array, newArray);
                Interlocked.Exchange(ref _emptyArray, null);
                Interlocked.Exchange(ref _arrayCountOfCompleted, 0);
                Interlocked.Exchange(ref _arrayCount, 0); // Unlock other threads
                Interlocked.Exchange(ref _emptyArray, new T[_chunkSize]);
            }
            else
            {
                // Wait other thread to replace the full array with a new one.
                while (Interlocked.CompareExchange(
                    ref _arrayCount, 0, 0) > _chunkSize) { } // Spin
            }
        }
    }

    public void CompleteAdding()
    {
        var array = Interlocked.Exchange(ref _array, null);
        if (array != null)
        {
            int count = Interlocked.Exchange(ref _arrayCount, -1);
            while (Interlocked.CompareExchange(
                ref _arrayCountOfCompleted, 0, 0) < count) { } // Spin
            _blockingCollection.Add((array, count));
            _blockingCollection.CompleteAdding();
        }
    }

    public IEnumerable<T> GetConsumingEnumerable()
    {
        foreach (var (array, count) in _blockingCollection.GetConsumingEnumerable())
        {
            for (int i = 0; i < count; i++)
            {
                yield return array[i];
            }
        }
    }
}

1 Ответ

1 голос
/ 09 июля 2019

Вы можете попробовать использовать массив для _chunk вместо использования List<T>.Затем вы можете использовать Interlocked.Increment, чтобы увеличить следующий индекс для заполнения на Add, и, когда ваш счет превысит размер вашего чанка, переместите все это в коллекцию блокировок и, конечно, сбросьте индекс в блокировке.

...