Я много раз использовал BlockingCollection
для реализации шаблона производитель / потребитель, но у меня была плохая производительность с чрезвычайно детализированными данными из-за связанных издержек.Обычно это вынуждает меня импровизировать, разбивая на части / разбивая мои данные, другими словами, используя BlockingCollection<T[]>
вместо BlockingCollection<T>
.Вот последний пример .Это работает, но уродливо и подвержено ошибкам.Я заканчиваю тем, что использую вложенные циклы как у производителя, так и у потребителя, и я должен помнить Add
, что остается в конце рабочей нагрузки производителя.Таким образом, у меня была идея реализовать коренастый BlockingCollection
, который будет обрабатывать все эти сложности внутренне и выведет тот же простой интерфейс с существующим BlockingCollection
.Моя проблема в том, что мне пока не удалось сопоставить производительность сложного ручного разбиения.Моя лучшая попытка все еще платит налог производительности около + 100%, для чрезвычайно детализированных данных (в основном только целочисленные значения).Поэтому я хотел бы представить здесь то, что я сделал до сих пор, в надежде получить совет, который поможет мне сократить разрыв в производительности.
Моя лучшая попытка - использовать ThreadLocal<List<T>>
, поэтомучто каждый поток работает на выделенном чанке, устраняя необходимость в блокировках.
public class ChunkyBlockingCollection1<T>
{
private readonly BlockingCollection<T[]> _blockingCollection;
public readonly int _chunkSize;
private readonly ThreadLocal<List<T>> _chunk;
public ChunkyBlockingCollection1(int chunkSize)
{
_blockingCollection = new BlockingCollection<T[]>();
_chunkSize = chunkSize;
_chunk = new ThreadLocal<List<T>>(() => new List<T>(chunkSize), true);
}
public void Add(T item)
{
var chunk = _chunk.Value;
chunk.Add(item);
if (chunk.Count >= _chunkSize)
{
_blockingCollection.Add(chunk.ToArray());
chunk.Clear();
}
}
public void CompleteAdding()
{
var chunks = _chunk.Values.ToArray();
foreach (var chunk in chunks)
{
_blockingCollection.Add(chunk.ToArray());
chunk.Clear();
}
_blockingCollection.CompleteAdding();
}
public IEnumerable<T> GetConsumingEnumerable()
{
foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
{
for (int i = 0; i < chunk.Length; i++)
{
yield return chunk[i];
}
}
}
}
Моя вторая лучшая попытка - использовать один List<T>
в качестве чанка, доступ к которому осуществляется всеми потоками безопасным для потока способом, используязамок.Удивительно, но это лишь немного медленнее, чем решение ThreadLocal<List<T>>
.
public class ChunkyBlockingCollection2<T>
{
private readonly BlockingCollection<T[]> _blockingCollection;
public readonly int _chunkSize;
private readonly List<T> _chunk;
private readonly object _locker = new object();
public ChunkyBlockingCollection2(int chunkSize)
{
_blockingCollection = new BlockingCollection<T[]>();
_chunkSize = chunkSize;
_chunk = new List<T>(chunkSize);
}
public void Add(T item)
{
lock (_locker)
{
_chunk.Add(item);
if (_chunk.Count >= _chunkSize)
{
_blockingCollection.Add(_chunk.ToArray());
_chunk.Clear();
}
}
}
public void CompleteAdding()
{
lock (_locker)
{
_blockingCollection.Add(_chunk.ToArray());
_chunk.Clear();
}
_blockingCollection.CompleteAdding();
}
public IEnumerable<T> GetConsumingEnumerable()
{
foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
{
for (int i = 0; i < chunk.Length; i++)
{
yield return chunk[i];
}
}
}
}
Я также пытался использовать в качестве блока ConcurrentBag<T>
, что привело к плохой производительности и проблеме с корректностью (потому что я неиспользовать замок).Другая попытка была заменить lock (_locker)
на SpinLock
, с еще худшей производительностью.Блокировка - это, очевидно, корень моих проблем, потому что, если я удаляю ее полностью, мой класс достигает оптимальной производительности.Конечно, удаление блокировки с треском проваливается с несколькими производителями.
Обновление: Я реализовал решение без блокировки , предложено , Ник , интенсивно использующий класс Interlocked
.В конфигурации с одним производителем производительность немного лучше, но становится намного хуже с двумя или более производителями.Непостоянно много столкновений, которые вызывают вращение потоков.Реализация также очень сложна, что позволяет легко обнаруживать ошибки.
public class ChunkyBlockingCollection3<T>
{
private readonly BlockingCollection<(T[], int)> _blockingCollection;
public readonly int _chunkSize;
private T[] _array;
private int _arrayCount;
private int _arrayCountOfCompleted;
private T[] _emptyArray;
public ChunkyBlockingCollection3(int chunkSize)
{
_chunkSize = chunkSize;
_blockingCollection = new BlockingCollection<(T[], int)>();
_array = new T[chunkSize];
_arrayCount = 0;
_arrayCountOfCompleted = 0;
_emptyArray = new T[chunkSize];
}
public void Add(T item)
{
while (true) // Spin
{
int count = _arrayCount;
while (true) // Spin
{
int previous = count;
count++;
int result = Interlocked.CompareExchange(ref _arrayCount,
count, previous);
if (result == previous) break;
count = result;
}
var array = Interlocked.CompareExchange(ref _array, null, null);
if (array == null) throw new InvalidOperationException(
"The collection has been marked as complete.");
if (count <= _chunkSize)
{
// There is empty space in the array
array[count - 1] = item;
Interlocked.Increment(ref _arrayCountOfCompleted);
break; // Adding is completed
}
if (count == _chunkSize + 1)
{
// Array is full. Push it to the BlockingCollection.
while (Interlocked.CompareExchange(
ref _arrayCountOfCompleted, 0, 0) < _chunkSize) { } // Spin
_blockingCollection.Add((array, _chunkSize));
T[] newArray;
while ((newArray = Interlocked.CompareExchange(
ref _emptyArray, null, null)) == null) { } // Spin
Interlocked.Exchange(ref _array, newArray);
Interlocked.Exchange(ref _emptyArray, null);
Interlocked.Exchange(ref _arrayCountOfCompleted, 0);
Interlocked.Exchange(ref _arrayCount, 0); // Unlock other threads
Interlocked.Exchange(ref _emptyArray, new T[_chunkSize]);
}
else
{
// Wait other thread to replace the full array with a new one.
while (Interlocked.CompareExchange(
ref _arrayCount, 0, 0) > _chunkSize) { } // Spin
}
}
}
public void CompleteAdding()
{
var array = Interlocked.Exchange(ref _array, null);
if (array != null)
{
int count = Interlocked.Exchange(ref _arrayCount, -1);
while (Interlocked.CompareExchange(
ref _arrayCountOfCompleted, 0, 0) < count) { } // Spin
_blockingCollection.Add((array, count));
_blockingCollection.CompleteAdding();
}
}
public IEnumerable<T> GetConsumingEnumerable()
{
foreach (var (array, count) in _blockingCollection.GetConsumingEnumerable())
{
for (int i = 0; i < count; i++)
{
yield return array[i];
}
}
}
}