Как ограничить количество потоков, созданных для асинхронной операции Seq.map в F #? - PullRequest
7 голосов
/ 18 сентября 2010

Текущая настройка выглядит примерно так

array
|> Seq.map (fun item -> async { return f item})
|> Async.Parallel
|> Async.RunSynchronously

Проблема в том, что это приводит к созданию слишком большого количества потоков и периодическому сбою приложения.

Как ограничить количество потоков вэто случай (скажем, Environment.ProcessorCount)?

Ответы [ 3 ]

3 голосов
/ 18 сентября 2010

Если вы хотите распараллелить вычисления с интенсивным использованием процессора, которые принимают массив (или любую последовательность) в качестве входных данных, то может быть лучше использовать модуль PSeq из F # PowerPack (которыйдоступно только на .NET 4.0, хотя).Он предоставляет параллельные версии многих стандартных Array.xyz функций.Для получения дополнительной информации вы также можете посмотреть F # перевод из Параллельное программирование с .NET сэмплами.

Код для решения вашей проблемы будет немного проще, чем при использовании рабочих процессов:

array |> PSeq.map f
      |> PSeq.toArray 

Некоторые различия между этими двумя опциями:

  • PSeq создается с использованием параллельной библиотеки задач (TPL) из .NET 4.0, которая оптимизирована для работы с большим количеством ресурсоемких задач.
  • Асинхронизация реализована в F #библиотеки и поддерживает асинхронные (неблокирующие) операции, такие как ввод / вывод в одновременно выполняемых операциях.

В итоге, если вам нужны асинхронные операции (например, ввод / вывод), тогда Async - этолучший вариантЕсли у вас большое количество задач, интенсивно использующих процессор, то PSeq может быть лучшим выбором (в .NET 4.0)

2 голосов
/ 09 апреля 2014

Вот рабочий пример того, как сделать это с помощью семафора, в духе предложения Брайана:

open System

let throttle n fs =
    seq { let n = new Threading.Semaphore(n, n)
          for f in fs ->
              async { let! ok = Async.AwaitWaitHandle(n)
                      let! result = Async.Catch f
                      n.Release() |> ignore
                      return match result with
                             | Choice1Of2 rslt -> rslt
                             | Choice2Of2 exn  -> raise exn
                    }
        }

let f i = async { printfn "start %d" i
                  do! Async.Sleep(2000)
                }
let fs = Seq.init 10 f

fs |> throttle 2 |> Async.Parallel |> Async.RunSynchronously |> ignore
1 голос
/ 18 сентября 2010

Есть пара вещей, которые вы можете сделать.

Во-первых, поскольку здесь используется ThreadPool, вы можете использовать ThreadPool.SetMaxThreads.

Во-вторых, вы можете ввести свой собственный газ вместе сэти строки:

let throttle = makeThrottle(8)
array 
|> Seq.map (fun item -> async { do! throttle.Wait()
                                return f item}) 
|> Async.Parallel 
|> Async.RunSynchronously 

makeThrottle() не будет слишком трудным для написания, но это приведет к небольшим накладным расходам синхронизации.Если вы пытаетесь распараллелить так много вещей, что у вас не хватает памяти, перегрузка газа, скорее всего, не будет проблемой.(Дайте мне знать, если вам нужен пример для такого рода кода.)

Наконец, если это действительно сбой, это пахнет, как будто вы делаете что-то не так.ThreadPool обычно (но не всегда) хорошо справляется с управлением.Но в различных обстоятельствах разработка собственного дросселя в любом случае может оказаться полезной для вашего приложения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...