System.Threading.Tasks - Ограничить количество одновременных задач - PullRequest
42 голосов
/ 24 мая 2010

Я только начал изучать новые достоинства System.Threading.Tasks в .Net 4.0 и хотел бы узнать, есть ли какая-либо встроенная поддержка для ограничения количества одновременных задач, которые выполняются одновременно, или если это должно быть обработано вручную.

E.G: Если мне нужно вызвать метод расчета 100 раз, есть ли способ установить 100 задач, но только 5 из них выполняются одновременно? Ответ может состоять в том, чтобы просто создать 5 задач, вызвать Task.WaitAny и создать новую задачу по завершении каждой предыдущей. Я просто хочу убедиться, что я не пропустил трюк, если есть лучший способ сделать это.

В принципе, есть встроенный способ сделать это:

Dim taskArray() = {New Task(Function() DoComputation1()),
                   New Task(Function() DoComputation2()),
                   ...
                   New Task(Function() DoComputation100())}

Dim maxConcurrentThreads As Integer = 5
RunAllTasks(taskArray, maxConcurrentThreads)

Спасибо за любую помощь.

Ответы [ 8 ]

45 голосов
/ 21 января 2011

Я знаю, что это почти год, но я нашел гораздо более простой способ достичь этого, поэтому я решил поделиться:

Dim actionsArray() As Action = 
     new Action(){
         New Action(Sub() DoComputation1()),
         New Action(Sub() DoComputation2()),
         ...
         New Action(Sub() DoComputation100())
      }

System.Threading.Tasks.Parallel.Invoke(New Tasks.ParallelOptions() With {.MaxDegreeOfParallelism = 5}, actionsArray)

Вуаля!

28 голосов
/ 12 августа 2010

Я знаю, что это старый поток, но я просто хотел поделиться своим решением этой проблемы: использовать семафоры.

(Это в C #)

private void RunAllActions(IEnumerable<Action> actions, int maxConcurrency)
{
    using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
    {
        foreach(Action action in actions)
        {
            Task.Factory.StartNew(() =>
            {
                concurrencySemaphore.Wait();
                try
                {
                    action();
                }
                finally
                {
                    concurrencySemaphore.Release();
                }
            });
        }
    }
}
7 голосов
/ 03 ноября 2010

Решением может быть просмотр готового кода от Microsoft здесь .

Описание выглядит следующим образом: «Предоставляет планировщик задач, который обеспечивает максимальный уровень параллелизма при работе поверх ThreadPool.» так же, как свойство MaxDegreeOfParallelism в ParallelOptions.

6 голосов
/ 25 февраля 2014

C # эквивалент образца, предоставленного Джеймсом

Action[] actionsArray = new Action[] {
new Action(() => DoComputation1()),
new Action(() => DoComputation2()),
    //...
new Action(() => DoComputation100())
  };

   System.Threading.Tasks.Parallel.Invoke(new Tasks.ParallelOptions {MaxDegreeOfParallelism =  5 }, actionsArray)
3 голосов
/ 29 апреля 2016

В моем блоге показано, как это сделать как с Задачами, так и с Действиями, а также приведен пример проекта, который можно загрузить и запустить, чтобы увидеть оба в действии.

с действиями

Если вы используете действия, вы можете использовать встроенную функцию .Net Parallel.Invoke.Здесь мы ограничиваем его одновременной работой максимум 5 потоков.

var listOfActions = new List<Action>();
for (int i = 0; i < 100; i++)
{
    // Note that we create the Action here, but do not start it.
    listOfActions.Add(() => DoSomething());
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 5};
Parallel.Invoke(options, listOfActions.ToArray());

с задачами

Поскольку здесь вы используете задачи, встроенной функции нет.Тем не менее, вы можете использовать тот, который я предоставляю в своем блоге.

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        StartAndWaitAllThrottled(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don&#39;t enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                throttler.Wait(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler&#39;s using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            Task.WaitAll(postTaskTasks.ToArray(), cancellationToken);
        }
    }

И затем создать свой список задач и вызвать функцию для их запуска, скажем, максимум 5 одновременных одновременно,Вы могли бы сделать это:

var listOfTasks = new List<Task>();
for (int i = 0; i < 100; i++)
{
    var count = i;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => Something()));
}
Tasks.StartAndWaitAllThrottled(listOfTasks, 5);
3 голосов
/ 27 мая 2010

Краткий ответ: Если вы хотите ограничить количество рабочих задач, чтобы они не насыщали ваш веб-сервис, то я думаю, что ваш подход в порядке.

Длинный ответ: Новый движок System.Threading.Tasks в .NET 4.0 работает поверх .NET ThreadPool. Поскольку в каждом процессе существует только один поток ThreadPool, по умолчанию используется не более 250 рабочих потоков. Следовательно, если вы установите максимальное количество потоков в ThreadPool на более скромное число, вы сможете уменьшить количество одновременно выполняющихся потоков и, следовательно, задач, используя ThreadPool.SetMaxThreads (...) API.

ОДНАКО, обратите внимание, что вы, возможно, не одиноки в использовании ThreadPool, так как многие другие используемые вами классы также могут ставить элементы в очередь в ThreadPool. Следовательно, есть большая вероятность, что вы можете нанести вред остальной части своего приложения, делая это. Также обратите внимание, что поскольку ThreadPool использует алгоритм для оптимизации использования базовых ядер данной машины, ограничение числа потоков, которые поток потоков может поставить в очередь до сколь угодно низкого числа, может привести к некоторым довольно катастрофическим проблемам с производительностью.

Опять же, если вы хотите выполнить небольшое количество рабочих задач / потоков для выполнения какой-либо задачи, то лучшим решением будет только создание небольшого количества задач (по сравнению с 100).

0 голосов
/ 18 апреля 2012

Если ваша программа использует веб-сервисы, количество одновременных подключений будет ограничено значением ServicePointManager.DefaultConnectionLimit.Если вы хотите 5 одновременных подключений, недостаточно использовать решение Arrow_Raider.Вам также следует увеличить ServicePointManager.DefaultConnectionLimit, поскольку по умолчанию оно равно 2.

0 голосов
/ 24 мая 2010

Это не похоже на это, хотя вы можете создать подкласс TaskScheduler, который реализует такое поведение.

...