Parallel.Foreach порождает слишком много потоков - PullRequest
13 голосов
/ 05 января 2010

проблема

Хотя код, о котором я расскажу здесь, я написал на F #, он основан на .NET 4 framework, не зависящем конкретно от какой-либо особенности F # (по крайней мере, так кажется!).

У меня есть некоторые данные на диске, которые я должен обновить из сети, сохранив последнюю версию на диск:

type MyData =
    { field1 : int;
      field2 : float }

type MyDataGroup =
    { Data : MyData[];
      Id : int }

// load : int -> MyDataGroup
let load dataId =
    let data = ... // reads from disk
    { Data = data;
      Id = dataId }

// update : MyDataGroup -> MyDataGroup
let update dg =
    let newData = ... // reads from the network and process
                      // newData : MyData[]

    { dg with Data = dg.Data
                     |> Seq.ofArray
                     |> Seq.append newData
                     |> processDataSomehow
                     |> Seq.toArray }

// save : MyDataGroup -> unit
let save dg = ... // writes to the disk

let loadAndSaveAndUpdate = load >> update >> save

Проблема в том, что для loadAndSaveAndUpdate всех моих данных мне нужно будет выполнить функцию много раз:

{1 .. 5000} |> loadAndSaveAndUpdate

Каждый шаг будет делать

  • некоторый дисковый ввод-вывод,
  • некоторые данные хрустят,
  • некоторый сетевой ввод-вывод (с большой задержкой),
  • больше хруста данных,
  • и некоторый дисковый ввод-вывод.

Не было бы неплохо, если бы это было сделано параллельно, до некоторой степени? К сожалению, ни одна из моих функций чтения и синтаксического анализа не "готова к асинхронным рабочим процессам".

Первые (не очень хорошие) решения, которые я придумал

Задачи

Первым делом я настроил Task[] и запустил их все:

let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
            |> Seq.map createTask
            |> Seq.toArray

tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)

Затем я нажимаю CTRL + ESC, чтобы посмотреть, сколько потоков он использовал. 15, 17, ..., 35, ..., 170, ... пока не убили приложение! Что-то пошло не так.

Параллельное

Я сделал почти то же самое, но использовал Parallel.ForEach(...), и результаты были одинаковыми: много-много-много-много тем.

Решение, которое работает ... вроде

Тогда я решил запустить только n темы, Task.WaitAll(of them), затем другие n, пока не осталось доступных задач.

Это работает, но проблема в том, что когда он завершит обработку, скажем, n-1 задач, он будет ждать, ждать, ждать последнюю чертову задачу, которая настаивает на блокировке из-за большой задержки в сети. Это не хорошо!

Итак, как бы вы атаковали эту проблему ? Буду признателен за просмотр различных решений, включая асинхронные рабочие процессы (и в данном случае, как адаптировать мои неасинхронные функции), параллельные расширения, странные параллельные шаблоны и т. Д.

Спасибо.

Ответы [ 4 ]

11 голосов
/ 26 мая 2010

ParallelOptions.MaxDegreeOfParallelism ограничивает количество параллельных операций, выполняемых вызовами метода Parallel

10 голосов
/ 05 января 2010

Использование «async» позволит вам выполнять работу, связанную с вводом / выводом, без прожига нитей, пока различные вызовы ввода / вывода находятся «в море», так что это будет моим первым предложением. Преобразование кода в асинхронный должно быть простым, обычно в соответствии с

  • Оберните каждое тело функции в async{...}, добавьте return, где необходимо
  • создание асинхронных версий любых примитивов ввода / вывода, которых еще нет в библиотеке, через Async.FromBeginEnd
  • Переключение вызовов вида let r = Foo() на let! r = AsyncFoo()
  • Используйте Async.Parallel для преобразования 5000 асинхронных объектов в один асинхронный, который работает параллельно

Существуют различные учебники для этого; одна такая трансляция здесь здесь .

7 голосов
/ 08 января 2010

Вы уверены, что ваши индивидуальные задачи выполняются своевременно? Я считаю, что и класс Parallel.ForEach, и класс Task уже используют пул потоков .NET. Задачи, как правило, должны быть недолговечными рабочими элементами, и в этом случае пул потоков будет порождать только небольшое количество реальных потоков, но если ваши задачи не выполняются, и есть другие задачи, поставленные в очередь, тогда число используемых потоков будет постоянно увеличиваться до максимум (который по умолчанию равен 250 / процессор в .NET 2.0 SP1, но отличается в разных версиях фреймворка). Стоит также отметить, что (по крайней мере, в .NET 2.0 SP1) создание новых потоков сокращается до 2 новых потоков в секунду, поэтому получение количества потоков, которые вы видите, указывает на то, что задачи не завершаются за короткий промежуток времени. время (поэтому, возможно, не совсем точно обвинить Parallel.ForEach).

Я думаю, что предложение Брайана использовать async рабочие процессы является хорошим, особенно если источником долгоживущих задач является IO, поскольку async вернет ваши потоки в пул потоков, пока IO не завершится. Другой вариант - просто признать, что ваши задачи не выполняются быстро, и разрешить порождение множества потоков (которые можно контролировать в некоторой степени с помощью System.Threading.ThreadPool.SetMaxThreads) - в зависимости от вашей ситуации это может не иметь большого значения, что вы ' Вы используете много тем.

0 голосов
/ 05 января 2010

Вы всегда можете использовать ThreadPool.

http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx

в основном:

  1. Создать пул потоков
  2. Установить максимальное количество потоков
  3. Поставить в очередь все задачи, используя QueueUserWorkItem(WaitCallback)
...