Я пытаюсь обрабатывать некоторые элементы в BlockingCollection параллельно, используя Parallel.ForEach (). При обработке предмета он может генерировать еще 0-2 предмета для обработки. Количество элементов для обработки всегда в конечном итоге достигнет 0.
Моя проблема заключается в том, что, поскольку потребитель также является производителем (элементы обработки могут генерировать больше элементов для обработки), я не могу вызвать функцию CompleteAdding () из BlockingCollection, когда коллекция BlockingCollection пуста, поскольку могут быть другие потоки, которые в настоящее время обрабатывают элемент, который будет генерировать больше предметов. Поэтому я не знаю, как сообщить BlockingCollection / Parallel.ForEach, что он может завершиться.
Вот пример ситуации (модифицированной для простоты)
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace Example
{
class Example
{
static void Main(string[] args)
{
var process = new BlockingCollection<int>() { 30 };
var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
{
if (item > 20)
{
// Some add 2 items
process.Add(item - 1);
process.Add(item - 1);
Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2");
}
else if (item > 10)
{
// Some add 1 item
process.Add(item-1);
Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1");
}
else
{
// Some add 0 items
Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0");
}
});
// Parallel.ForEach never exits
Console.WriteLine("Completed Processing");
Console.ReadKey();
}
}
}
Я пытался изменить MaxDegreeOfParallelism во время Parallel.ForEach до минимума количества элементов для обработки и Environment.ProcessorCount, но это ничего не делает во время Parallel.ForEach.
Я также попытался сохранить счетчик количества необработанных элементов и выполнить блокировку при обновлении этого числа в каждом потоке. Когда необработанные элементы равны 0, тогда я вызываю метод AddingCompleted. Это тоже не работает.
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace Example
{
class Example
{
static void Main(string[] args)
{
var runningLock = new object();
int running = 0;
var process = new BlockingCollection<int>() { 30 };
var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
{
lock (runningLock)
{
running++;
}
if (item > 20)
{
// Some add 2 items
process.Add(item - 1);
process.Add(item - 1);
Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2 | running: {running}");
}
else if (item > 10)
{
// Some add 1 item
process.Add(item - 1);
Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1 | running: {running}");
}
else
{
// Some add 0 items
Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0 | running: {running}");
}
lock (runningLock)
{
running--;
if (running == 0 && process.Count == 0)
{
Console.WriteLine($"Stopping | running: {running} | process.Count: {process.Count}");
process.CompleteAdding();
}
}
});
// Parallel.ForEach never exits
Console.WriteLine("Completed Processing");
Console.ReadKey();
}
}
}
Должен ли я использовать что-то другое вместо Parallel.ForEach для этого?
Кроме того, при установке MaxDegreeOfParallelism в 1. Если начальный элемент BlockingCollection>> 27, он обрабатывает все нормально, однако, если он <= 26, он останавливает обработку элементов около 16? Кроме того, более высокий MaxDegreeOfParallelism приводит к остановке обработки элементов с меньшим числом. </p>
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace Example
{
class Example
{
static void Main(string[] args)
{
// Normal
var process = new BlockingCollection<int>() { 27 };
// Stops around 16
//var process = new BlockingCollection<int>() { 26 };
var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = 1 };
Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
{
if (item > 20)
{
// Some add 2 items
process.Add(item - 1);
process.Add(item - 1);
Console.WriteLine($"Process Size: {process.Count} | Current Num: {item} | Added: 2");
}
else if (item > 10)
{
// Some add 1 item
process.Add(item - 1);
Console.WriteLine($"Process Size: {process.Count}| Current Num: {item} | Added: 1");
}
else
{
// Some add 0 items
Console.WriteLine($"Process Size: {process.Count}| Current Num: {item} | Added: 0");
}
});
// Parallel.ForEach never exits
Console.WriteLine("Completed Processing");
Console.ReadKey();
}
}
}
Вот фактический код , если кто-то предпочитает смотреть на реальный код вместо абстрактной версии.