Ограничить количество параллельных потоков в C # - PullRequest
22 голосов
/ 13 января 2012

Я пишу программу на C # для генерации и загрузки полумиллиона файлов по FTP. Я хочу обрабатывать 4 файла параллельно, так как машина имеет 4 ядра, а генерация файлов занимает гораздо больше времени. Можно ли преобразовать следующий пример Powershell в C #? Или есть какой-нибудь лучший фреймворк, такой как Actor, в C # (например, F # MailboxProcessor)?

Пример Powershell

$maxConcurrentJobs = 3;

# Read the input and queue it up
$jobInput = get-content .\input.txt
$queue = [System.Collections.Queue]::Synchronized( (New-Object System.Collections.Queue) )
foreach($item in $jobInput)
{
    $queue.Enqueue($item)
}

# Function that pops input off the queue and starts a job with it
function RunJobFromQueue
{
    if( $queue.Count -gt 0)
    {
        $j = Start-Job -ScriptBlock {param($x); Get-WinEvent -LogName $x} -ArgumentList $queue.Dequeue()
        Register-ObjectEvent -InputObject $j -EventName StateChanged -Action { RunJobFromQueue; Unregister-Event $eventsubscriber.SourceIdentifier; Remove-Job $eventsubscriber.SourceIdentifier } | Out-Null
    }
}

# Start up to the max number of concurrent jobs
# Each job will take care of running the rest
for( $i = 0; $i -lt $maxConcurrentJobs; $i++ )
{
    RunJobFromQueue
}

Обновление:
Подключение к удаленному FTP-серверу может быть медленным, поэтому я хочу ограничить обработку загрузки по FTP.

Ответы [ 5 ]

31 голосов
/ 13 января 2012

Предполагая, что вы строите это с помощью TPL, вы можете установить ParallelOptions.MaxDegreesOfParallelism на любое значение, которое вы хотите.

Parallel.For для примера кода.

17 голосов
/ 13 января 2012

Task Parallel Library ваш друг здесь. См. эту ссылку, которая описывает, что доступно для вас. В основном Framework 4 поставляется с ним, который оптимизирует эти по существу фоновые потоки, объединенные в потоки, для числа процессоров на работающей машине.

Возможно, что-то вроде:

ParallelOptions options = new ParallelOptions();

options.MaxDegreeOfParallelism = 4;

Тогда в вашем цикле что-то вроде:

Parallel.Invoke(options,
 () => new WebClient().Upload("http://www.linqpad.net", "lp.html"),
 () => new WebClient().Upload("http://www.jaoo.dk", "jaoo.html"));
5 голосов
/ 13 января 2012

Если вы используете .Net 4.0, вы можете использовать библиотеку Parallel

Предположим, что вы выполняете итерацию через полмиллиона файлов, которые вы можете "выполнить параллельно" итерации, используя Параллельный Foreach, например , или вы можете взглянуть на PLinq Здесь сравнение между двумя

2 голосов
/ 29 апреля 2016

По сути, вы захотите создать действие или задачу для каждого загружаемого файла, поместить их в список, а затем обработать этот список, ограничив число, которое может обрабатываться параллельно.

В моем блоге показано, как это сделать как с Задачами, так и с Действиями, и представлен пример проекта, который можно загрузить и запустить, чтобы увидеть оба в действии.

С действиями

При использовании действий можно использовать встроенную функцию .Net Parallel.Invoke.Здесь мы ограничиваем его одновременной работой максимум 4 потоков.

var listOfActions = new List<Action>();
foreach (var file in files)
{
    var localFile = file;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => UploadFile(localFile)));
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 4};
Parallel.Invoke(options, listOfActions.ToArray());

Эта опция не поддерживает асинхронность, и я предполагаю, что у вас будет функция FileUpload, так что вы можете захотеть использоватьПример задачи ниже.

С Задачами

С Задачами нет встроенной функции.Однако вы можете использовать тот, который я предоставляю в своем блоге.

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            await Task.WhenAll(postTaskTasks.ToArray());
        }
    }

И затем создать свой список задач и вызвать функцию для их запуска, скажем, максимум 4 одновременно,Вы можете сделать это:

var listOfTasks = new List<Task>();
foreach (var file in files)
{
    var localFile = file;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(async () => await UploadFile(localFile)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 4);

Кроме того, поскольку этот метод поддерживает асинхронный режим, он не будет блокировать поток пользовательского интерфейса, как это было бы при использовании Parallel.Invoke или Parallel.ForEach.

0 голосов
/ 13 ноября 2016

Я кодировал ниже метод, где я использую BlockingCollection в качестве менеджера подсчета потоков.Это довольно просто реализовать и справиться с работой.Он просто принимает объекты Task и добавляет целое значение в список блокировки, увеличивая количество запущенных потоков на 1. Когда поток завершается, он удаляет объект из очереди и освобождает блок при операции добавления для предстоящих задач.

        public class BlockingTaskQueue
        {
            private BlockingCollection<int> threadManager { get; set; } = null;
            public bool IsWorking
            {
                get
                {
                    return threadManager.Count > 0 ? true : false;
                }
            }

            public BlockingTaskQueue(int maxThread)
            {
                threadManager = new BlockingCollection<int>(maxThread);
            }

            public async Task AddTask(Task task)
            {
                Task.Run(() =>
                {
                    Run(task);
                });
            }

            private bool Run(Task task)
            {
                try
                {
                    threadManager.Add(1);
                    task.Start();
                    task.Wait();
                    return true;

                }
                catch (Exception ex)
                {
                    return false;
                }
                finally
                {
                    threadManager.Take();
                }

            }

        }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...