SemaphoreSlim не душит задачи - PullRequest
       11

SemaphoreSlim не душит задачи

2 голосов
/ 01 октября 2019

Я создал следующий метод TestThrottled, чтобы попытаться регулировать мои задачи, но он не регулируется вообще, когда я вызываю WhenAll, и у этого метода одинаковое прошедшее время. Я что-то делаю не так?

    private static async Task<T[]> TestThrottled<T>(List<Task<T>> tasks, int maxDegreeOfParallelism)
    {
        var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
        var tasksParallelized = new List<Task<T>>();

        foreach (var task in tasks)
        {
            var taskParallelized = Task.Run(async () =>
            {
                try
                {
                    await semaphore.WaitAsync();

                    return await task;
                }
                finally
                {
                    semaphore.Release();
                }
            });
            tasksParallelized.Add(taskParallelized);
        }

        return await Task.WhenAll(tasksParallelized);
    }

    private static async Task<int> TestAsync()
    {
        await Task.Delay(1000);

        return 1;
    }

    static async Task Main(string[] args)
    {
        var sw = Stopwatch.StartNew();

        var tasks = new List<Task<int>>();
        var ints = new List<int>();

        for (int i = 0; i < 30; i++)
        {
            tasks.Add(TestAsync());
        }
        ints.AddRange(await TestThrottled(tasks, 1));

        Console.WriteLine($"{sw.ElapsedMilliseconds}, count: {ints.Count}");
        Console.ReadLine();
    }

Ответы [ 4 ]

3 голосов
/ 01 октября 2019

Еще один способ сделать это - TPL DataFlow , он имеет все, что вам уже нужно, и может обслуживать более сложные конвейерную обработку , если это необходимо, и может быть более настраиваемым. Это также избавляет вас от разгрузки другой задачи, как в вашем примере решения

private static async Task<IList<T>> TestThrottled<T>(IEnumerable<Func<Task<T>>> tasks, int maxDegreeOfParallelism)
{
   var options = new ExecutionDataflowBlockOptions() { EnsureOrdered = false, MaxDegreeOfParallelism = maxDegreeOfParallelism };

   var transform = new TransformBlock<Func<Task<T>>, T>(func => func.Invoke(), options);
   var outputBufferBlock = new BufferBlock<T>();

   transform.LinkTo(outputBufferBlock, new DataflowLinkOptions(){PropagateCompletion = true});

   foreach (var task in tasks)
      transform.Post(task);

   transform.Complete();
   await outputBufferBlock. Completion;

   outputBufferBlock.TryReceiveAll(out var result);

   return result;
}
3 голосов
/ 01 октября 2019

Основной проблемой здесь является поведение async/await. Что происходит, когда вы звоните

private static async Task<int> TestAsync()
{ 
    await Task.Delay(1000);
    return 1;
}

TestAync();

TestAsync(). В этом методе вызывается Task.Delay(). Это создаст задачу, которая заканчивается через 1000 мс. Наконец, вы возвращаете эту задачу (фактически, другую задачу, запланированную как продолжение задачи, возвращаемой Task.Delay()).

Все эти задачи создаются примерно в одно и то же время в цикле в Main(). Поэтому, хотя у вас может быть семафор, который не позволяет нескольким потокам одновременно вызывать await task, все они запланированы так, чтобы завершаться примерно в одно и то же время. await ожидает только до тех пор, пока задача еще не завершена. Таким образом, как только первый поток освобождает семафор (что происходит примерно через секунду), следующий поток может войти в критическую область, где он обнаружит, что задача уже завершена (или очень близка к завершению). Затем он может сразу же освободить семафор. Это происходит и для остальных задач, и вы получаете общее время выполнения около одной секунды.

1 голос
/ 01 октября 2019

Ключом к решению этой проблемы является то, что троттлер может запускать задачи, а не запускать их заранее. И поскольку запуск задач явно со старым методом Task.Start очень ограничен (предшествует и не может использовать механизм асинхронного ожидания), единственная альтернатива - позволить троттлеру создавать задачи. Есть несколько способов сделать это:

1) Пропускать фабрики задач вместо задач. Этот метод уже был продемонстрирован в других ответах.

private static async Task<TResult[]> RunAsyncThrottled<TResult>(
    IEnumerable<Func<Task<TResult>>> taskFactories,
    int maxDegreeOfParallelism)
{
    //...
    foreach (var taskFactory in taskFactories)
        //...
        var task = taskFactory();
        TResult result = await task;
}

2) Передать последовательность элементов и одну фабрику задач, которая принимает элемент в качестве параметра. Это наиболее часто используемый метод:

private static async Task<TResult[]> RunAsyncThrottled<TSource, TResult>(
    IEnumerable<TSource> items, Func<TSource, Task<TResult>> taskFactory,
    int maxDegreeOfParallelism)
{
    //...
    foreach (var item in items)
        //...
        var task = taskFactory(item);
        TResult result = await task;
}

3) Передача отложенного множества задач. Такое перечислимое можно создать с помощью LINQ или итераторов (методы, которые yield). Здесь является полным примером.

private static async Task<TResult[]> RunAsyncThrottled<TResult>(
    IEnumerable<Task<TResult>> tasks, int maxDegreeOfParallelism)
{
    if (tasks is ICollection<Task<TResult>>) throw new ArgumentException(
        "The enumerable should not be materialized.", nameof(tasks));
    //...
    foreach (var task in tasks)
        //...
        TResult result = await task;
}

Поскольку C # 8 теперь освобождено, существует альтернатива возвращаемому значению метода. Вместо возврата Task<TResult[]> он может вернуть IAsyncEnumerable<TResult>, что позволяет выполнять асинхронное перечисление с await foreach.

private static async IAsyncEnumerable<TResult> RunAsyncThrottled<TSource, TResult>(
    IEnumerable<TSource> items, Func<TSource, Task<TResult>> taskFactory,
    int maxDegreeOfParallelism)
{
    //...
    foreach (var item in items)
        //...
        yield return await taskFactory(item);
}
1 голос
/ 01 октября 2019

Я решил свою проблему (создаю общий обработчик заданных задач, получающий список асинхронных методов), выполнив следующие действия:

    private static async Task<T[]> RunAsyncThrottled<T>(IEnumerable<Func<Task<T>>> tasks, int maxDegreeOfParallelism)
    {
        var tasksParallelized = new List<Task<T>>();

        using (var semaphore = new SemaphoreSlim(maxDegreeOfParallelism))
        {
            foreach (var task in tasks)
            {
                var taskParallelized = Task.Run(async () =>
                {
                    await semaphore.WaitAsync();
                    try
                    {
                        return await task.Invoke();
                    }
                    finally
                    {
                        semaphore.Release();
                    }
                });
                tasksParallelized.Add(taskParallelized);
            }

            return await Task.WhenAll(tasksParallelized);
        }
    }

    private static async Task<int> TestAsync(int num)
    {
        await Task.Delay(1000);

        return 1 + num;
    }

    static async Task Main(string[] args)
    {
        var sw = Stopwatch.StartNew();

        var tasks = new List<Func<Task<int>>>();
        var ints = new List<int>();

        for (int i = 0; i < 10; i++)
        {
            tasks.Add(() => TestAsync(12000));
        }

        ints.AddRange(await RunAsyncThrottled(tasks, 1000));

        Console.WriteLine($"{sw.Elapsed.TotalMilliseconds}, count: {ints.Count}");
        Console.ReadLine();
    }
...