Вы, вероятно, правы, что использование Parallel.ForEach
наносит ущерб цели переборки.Я думаю, что простой цикл с задержкой сделает работу по переборке задач.Хотя я предполагаю, что в реальном примере будет непрерывный поток данных, а не предопределенный список или массив.
using Polly;
using Polly.Bulkhead;
static async Task Main(string[] args)
{
var groupedObjects = Enumerable.Range(0, 10).Select(n => new object[] { n }); // Create 10 sets to work with
var bulkheadPolicy = Policy.BulkheadAsync(3, 3); // maxParallelization, maxQueuingActions
var parallelTasks = new List<Task>();
foreach (var set in groupedObjects)
{
Console.WriteLine($"Scheduling, Available: {bulkheadPolicy.BulkheadAvailableCount}, QueueAvailable: {bulkheadPolicy.QueueAvailableCount}");
var task = bulkheadPolicy.ExecuteAsync(async () => // Schedule the task and return immediately
{
await DoSomethingAsync(set).ConfigureAwait(false); // Await the task in another thread without capturing the context
});
parallelTasks.Add(task);
await Task.Delay(50); // Interval between scheduling more tasks
}
var whenAllTasks = Task.WhenAll(parallelTasks);
try
{
await whenAllTasks; // Await all the tasks (await throws only one of the exceptions)
}
catch
{
whenAllTasks.Exception.Handle(ex => ex is BulkheadRejectedException); // Ignore rejections, rethrow other exceptions
}
Console.WriteLine($"Processed: {parallelTasks.Where(t => t.Status == TaskStatus.RanToCompletion).Count()}");
Console.WriteLine($"Faulted: {parallelTasks.Where(t => t.IsFaulted).Count()}");
}
static async Task DoSomethingAsync(IEnumerable<object> set)
{
await Task.Delay(500).ConfigureAwait(false); // Pretend we are doing something with the set
}
Вывод:
Scheduling, Available: 3, QueueAvailable: 3
Scheduling, Available: 2, QueueAvailable: 3
Scheduling, Available: 1, QueueAvailable: 3
Scheduling, Available: 0, QueueAvailable: 3
Scheduling, Available: 0, QueueAvailable: 2
Scheduling, Available: 0, QueueAvailable: 1
Scheduling, Available: 0, QueueAvailable: 0
Scheduling, Available: 0, QueueAvailable: 0
Scheduling, Available: 0, QueueAvailable: 0
Scheduling, Available: 0, QueueAvailable: 1
Processed: 7
Faulted: 3
Обновление: Немного более реалистичная версия DoSomethingAsync
, которая фактически заставляет ЦП выполнять какую-то реальную работу (загрузка ЦП почти 100% в моей четырехъядерной машине).
private static async Task DoSomethingAsync(IEnumerable<object> objects)
{
await Task.Run(() =>
{
long sum = 0; for (int i = 0; i < 500000000; i++) sum += i;
}).ConfigureAwait(false);
}
ЭтоМетод не работает для всех наборов данных.Он работает только для наборов, которые не отклоняются переборкой.