Ограничивает ли Parallel.ForEach количество активных потоков? - PullRequest
98 голосов
/ 11 июля 2009

Учитывая этот код:

var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
    DoSomething(someString);
});

Будут ли все 1000 потоков появляться почти одновременно?

Ответы [ 5 ]

139 голосов
/ 11 июля 2009

Нет, он не запустит 1000 потоков - да, он ограничит количество используемых потоков. Parallel Extensions использует соответствующее количество ядер, исходя из того, сколько у вас физически и , сколько уже занято. Он распределяет работу для каждого ядра, а затем использует технику, называемую похищение работы , чтобы позволить каждому потоку эффективно обрабатывать свою собственную очередь, и ему нужен только дорогой межпотоковый доступ, когда это действительно необходимо.

Загляните в Блог команды PFX для загрузки информации о том, как она распределяет работу, и по всем другим темам.

Обратите внимание, что в некоторых случаях вы также можете указать желаемую степень параллелизма.

26 голосов
/ 12 января 2011

На одноядерном компьютере ... Parallel.ForEach разделы (чанки) коллекции, в которой она работает, между несколькими потоками, но это число рассчитывается на основе алгоритма, который учитывает и, по-видимому, постоянно отслеживает работу сделано потоками, которые он выделяет для ForEach. Таким образом, , если часть тела ForEach вызывает долго выполняемые функции IO-привязки / блокировки, которые заставили бы поток ждать вокруг, алгоритм порождает больше потоков и перераспределяет коллекцию между ними . Если потоки завершаются быстро и не блокируются, например, на потоках ввода-вывода, например, просто вычисляя некоторые числа, алгоритм будет увеличивать (или даже уменьшать) количество потоков до точки, где алгоритм считает оптимальной для пропускной способности (среднее время завершения каждой итерации) .

По сути, пул потоков за всеми различными функциями библиотеки Parallel будет определять оптимальное количество потоков для использования. Количество ядер физического процессора составляет только часть уравнения. Между числом ядер и количеством порождаемых потоков НЕ существует простого отношения один к одному.

Я не нахожу документацию по отмене и обработке синхронизирующих потоков очень полезной. Надеюсь, MS сможет предоставить лучшие примеры в MSDN.

Не забывайте, что код тела должен быть написан для выполнения в нескольких потоках, наряду со всеми обычными соображениями безопасности потоков, среда не абстрагирует этот фактор ... пока.

5 голосов
/ 11 июля 2009

См. Использует ли Parallel.For одну задачу на итерацию? для идеи использования "ментальной модели". Однако автор заявляет, что «в конце концов, важно помнить, что детали реализации могут измениться в любое время».

5 голосов
/ 11 июля 2009

Определяет оптимальное количество потоков в зависимости от количества процессоров / ядер. Не все они появятся одновременно.

3 голосов
/ 15 ноября 2017

Отличный вопрос. В вашем примере уровень распараллеливания довольно низок даже на четырехъядерном процессоре, но с некоторым ожиданием уровень распараллеливания может стать довольно высоким.

// Max concurrency: 5
[Test]
public void Memory_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);
        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Теперь посмотрите, что происходит, когда добавляется ожидающая операция для имитации HTTP-запроса.

// Max concurrency: 34
[Test]
public void Waiting_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Я еще не внес никаких изменений, и уровень параллелизма / распараллеливания резко вырос. Лимит параллелизма может быть увеличен с ParallelOptions.MaxDegreeOfParallelism.

// Max concurrency: 43
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

// Max concurrency: 391
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(100000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Рекомендую настройку ParallelOptions.MaxDegreeOfParallelism. Это не обязательно увеличит количество используемых потоков, но обеспечит запуск только нормального количества потоков, что, по-видимому, является вашей заботой.

Наконец, чтобы ответить на ваш вопрос, нет, вы не получите все темы для запуска сразу. Используйте Parallel.Invoke, если вы ищете для параллельного запуска, например, тестирование условий гонки.

// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623368346
// 636462943623368346
// 636462943623373351
// 636462943623393364
// 636462943623393364
[Test]
public void Test()
{
    ConcurrentBag<string> monitor = new ConcurrentBag<string>();
    ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(DateTime.UtcNow.Ticks.ToString());
        monitor.TryTake(out string result);
        monitorOut.Add(result);
    });

    var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
    Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10)));
}
...