Разъяснение при запуске нескольких асинхронных задач параллельно с регулированием - PullRequest
1 голос
/ 11 апреля 2019

РЕДАКТИРОВАТЬ: Так как политика Bulkhead должна быть обернута политикой WaitAndRetry, в любом случае ... Я склоняюсь к примеру 3, как лучшему решению для сохранения параллелизма, регулирования и повторных попыток политики.Просто кажется странным, поскольку я думал, что Parallel.ForEach предназначен для операций синхронизации, а Bulkhead будет лучше для async

. Я пытаюсь запустить несколько асинхронных задач параллельно с регулированием с помощью polly AsyncBulkheadPolicy.Насколько я понимаю, метод политики ExecuteAsync сам по себе не вызывает вызов потока, а оставляет его по умолчанию TaskScheduler или кому-либо еще до него.Таким образом, если мои задачи каким-то образом связаны с процессором, тогда мне нужно использовать Parallel.ForEach при выполнении задач или Task.Run () с методом ExecuteAsync для планирования задач в фоновых потоках.

Может кто-топосмотрите на приведенные ниже примеры и поясните, как они будут работать с точки зрения парализма и объединения потоков?

https://github.com/App-vNext/Polly/wiki/Bulkhead - Операция: политика перегородки не создает свои собственные потоки, она предполагает, что мы уже сделали это.

async Task DoSomething(IEnumerable<object> objects);

//Example 1:
//Simple use, but then I don't have access to retry policies from polly
Parallel.ForEach(groupedObjects, (set) =>
{
    var task = DoSomething(set);
    task.Wait();
});

//Example 2:
//Uses default TaskScheduler which may or may not run the tasks in parallel
var parallelTasks = new List<Task>();
foreach (var set in groupedObjects)
{
    var task = bulkheadPolicy.ExecuteAsync(async () => DoSomething(set));
    parallelTasks.Add(task);
};

await Task.WhenAll(parallelTasks);

//Example 3:
//seems to defeat the purpose of the bulkhead since Parallel.ForEach and
//PolicyBulkheadAsync can both do throttling...just use basic RetryPolicy
//here? 
Parallel.ForEach(groupedObjects, (set) =>
{
    var task = bulkheadPolicy.ExecuteAsync(async () => DoSomething(set));
    task.Wait();
});


//Example 4:
//Task.Run still uses the default Task scheduler and isn't any different than
//Example 2; just makes more tasks...this is my understanding.
var parallelTasks = new List<Task>();
foreach (var set in groupedObjects)
{
    var task = Task.Run(async () => await bulkheadPolicy.ExecuteAsync(async () => DoSomething(set)));
    parallelTasks.Add(task);
};

await Task.WhenAll(parallelTasks);

DoSomething - это асинхронный метод, выполняющий операции над множеством объектов.Мне бы хотелось, чтобы это происходило в параллельных потоках, при соблюдении политик повторов от polly и с учетом регулирования.

Кажется, я путался в том, каково именно функциональное поведение Parallel.ForEach и использование Bulkhead.Однако, когда дело доходит до обработки задач / потоков, ExecuteAsync делает это.

1 Ответ

1 голос
/ 11 апреля 2019

Вы, вероятно, правы, что использование Parallel.ForEach наносит ущерб цели переборки.Я думаю, что простой цикл с задержкой сделает работу по переборке задач.Хотя я предполагаю, что в реальном примере будет непрерывный поток данных, а не предопределенный список или массив.

using Polly;
using Polly.Bulkhead;

static async Task Main(string[] args)
{
    var groupedObjects = Enumerable.Range(0, 10).Select(n => new object[] { n }); // Create 10 sets to work with
    var bulkheadPolicy = Policy.BulkheadAsync(3, 3); // maxParallelization, maxQueuingActions
    var parallelTasks = new List<Task>();
    foreach (var set in groupedObjects)
    {
        Console.WriteLine($"Scheduling, Available: {bulkheadPolicy.BulkheadAvailableCount}, QueueAvailable: {bulkheadPolicy.QueueAvailableCount}");
        var task = bulkheadPolicy.ExecuteAsync(async () => // Schedule the task and return immediately
        {
            await DoSomethingAsync(set).ConfigureAwait(false); // Await the task in another thread without capturing the context
        });
        parallelTasks.Add(task);
        await Task.Delay(50); // Interval between scheduling more tasks
    }

    var whenAllTasks = Task.WhenAll(parallelTasks);
    try
    {
        await whenAllTasks; // Await all the tasks (await throws only one of the exceptions)
    }
    catch
    {
        whenAllTasks.Exception.Handle(ex => ex is BulkheadRejectedException); // Ignore rejections, rethrow other exceptions
    }
    Console.WriteLine($"Processed: {parallelTasks.Where(t => t.Status == TaskStatus.RanToCompletion).Count()}");
    Console.WriteLine($"Faulted: {parallelTasks.Where(t => t.IsFaulted).Count()}");
}

static async Task DoSomethingAsync(IEnumerable<object> set)
{
    await Task.Delay(500).ConfigureAwait(false); // Pretend we are doing something with the set
}

Вывод:

Scheduling, Available: 3, QueueAvailable: 3
Scheduling, Available: 2, QueueAvailable: 3
Scheduling, Available: 1, QueueAvailable: 3
Scheduling, Available: 0, QueueAvailable: 3
Scheduling, Available: 0, QueueAvailable: 2
Scheduling, Available: 0, QueueAvailable: 1
Scheduling, Available: 0, QueueAvailable: 0
Scheduling, Available: 0, QueueAvailable: 0
Scheduling, Available: 0, QueueAvailable: 0
Scheduling, Available: 0, QueueAvailable: 1
Processed: 7
Faulted: 3

Обновление: Немного более реалистичная версия DoSomethingAsync, которая фактически заставляет ЦП выполнять какую-то реальную работу (загрузка ЦП почти 100% в моей четырехъядерной машине).

private static async Task DoSomethingAsync(IEnumerable<object> objects)
{
    await Task.Run(() =>
    {
        long sum = 0; for (int i = 0; i < 500000000; i++) sum += i;
    }).ConfigureAwait(false);
}

ЭтоМетод не работает для всех наборов данных.Он работает только для наборов, которые не отклоняются переборкой.

...