Вот одна идея, которая предполагает создание метода расширения для TaskFactory
.
public static class TaskFactoryExtension
{
public static Task StartNew(this TaskFactory target, Action action, int parallelism)
{
var tasks = new Task[parallelism];
for (int i = 0; i < parallelism; i++)
{
tasks[i] = target.StartNew(action);
}
return target.StartNew(() => Task.WaitAll(tasks));
}
}
Тогда ваш код вызова будет выглядеть следующим образом.
ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
var task = Task.Factory.StartNew(
() =>
{
T item;
while (queue.TryDequeue(out item))
{
ProcessItem(item);
}
}, n);
task.Wait(); // Optionally wait for everything to finish.
Вот еще одна идея, использующая Parallel.ForEach
. Проблема с этим подходом состоит в том, что ваши степени параллелизма не обязательно должны соблюдаться. Вы указываете только максимально допустимую сумму, а не абсолютную сумму.
ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
Parallel.ForEach(queue, new ParallelOptions { MaxDegreeOfParallelism = n },
(item) =>
{
ProcessItem(item);
});