Как добиться асинхронности вместо параллелизма в F # - PullRequest
8 голосов
/ 30 ноября 2010

(придерживаясь общего примера с асинхронной выборкой многих веб-страниц)

Как бы я асинхронно выделил несколько (сотни) запросов веб-страниц, а затем дождался завершения всех запросов, прежде чем перейти к следующему шагу? Async.AsParallel обрабатывает несколько запросов одновременно, контролируемых количеством ядер на процессоре. Захват веб-страницы не является операцией, связанной с процессором. Не устраивает ускорение Async.AsParallel, я ищу альтернативы.

Я попытался соединить точки между Async.StartAsTask и Task []. WaitAll. Инстинктивно я написал следующий код, но он не компилируется.

let processItemsConcurrently (items : int seq) = 
  let tasks = items |> Seq.map (fun item -> Async.StartAsTask(fetchAsync item))
  Tasks.Task.WaitAll(tasks) 

Как бы вы подошли к этому?

Ответы [ 5 ]

8 голосов
/ 30 ноября 2010

Async.Parallel почти наверняка прямо здесь.Не уверен, что ты не доволен;Сила асинхронности F # заключается скорее в асинхронных вычислениях, чем в параллельных задачам процессоре, связанном с процессором (который более приспособлен к Task s и .NET 4.0 TPL).Вот полный пример:

open System.Diagnostics 
open System.IO
open System.Net
open Microsoft.FSharp.Control.WebExtensions 

let sites = [|
    "http://bing.com"
    "http://google.com"
    "http://cnn.com"
    "http://stackoverflow.com"
    "http://yahoo.com"
    "http://msdn.com"
    "http://microsoft.com"
    "http://apple.com"
    "http://nfl.com"
    "http://amazon.com"
    "http://ebay.com"
    "http://expedia.com"
    "http://twitter.com"
    "http://reddit.com"
    "http://hulu.com"
    "http://youtube.com"
    "http://wikipedia.org"
    "http://live.com"
    "http://msn.com"
    "http://wordpress.com"
    |]

let print s = 
    // careful, don't create a synchronization bottleneck by printing
    //printf "%s" s
    ()

let printSummary info fullTimeMs =
    Array.sortInPlaceBy (fun (i,_,_) -> i) info
//  for i, size, time in info do
//      printfn "%2d  %7d  %5d" i size time
    let longest = info |> Array.map (fun (_,_,time) -> time) |> Array.max
    printfn "longest request took %dms" longest
    let bytes = info |> Array.sumBy (fun (_,size,_) -> float size)
    let seconds = float fullTimeMs / 1000.
    printfn "sucked down %7.2f KB/s" (bytes / 1024.0 / seconds)

let FetchAllSync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url ->
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use resp = req.GetResponse()
        use stream = resp.GetResponseStream()
        use reader = new StreamReader(stream,
                            System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let contents = reader.ReadToEnd()
        print "r"
        i, contents.Length, sw.ElapsedMilliseconds)
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

let FetchAllAsync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url -> async {
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use! resp = req.AsyncGetResponse()
        use stream = resp.GetResponseStream()
        use reader = new AsyncStreamReader(stream, // F# PowerPack
                           System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let! contents = reader.ReadToEnd()  // in F# PowerPack
        print "r"
        return i, contents.Length, sw.ElapsedMilliseconds })
                    |> Async.Parallel 
                    |> Async.RunSynchronously 
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

// By default, I think .NET limits you to 2 open connections at once
ServicePointManager.DefaultConnectionLimit <- sites.Length 

for i in 1..3 do // to warmup and show variance
    let time1,r1 = FetchAllSync()
    printfn "Sync took %dms, result was %d" time1 r1
    let time2,r2 = FetchAllAsync()
    printfn "Async took %dms, result was %d  (speedup=%2.2f)" 
        time2 r2 (float time1/ float time2)
    printfn ""

На моем 4-ядерном корпусе это дает почти четырехкратное ускорение.

РЕДАКТИРОВАТЬ

В ответ на ваш комментарий ямы обновили код.Вы правы в том, что я добавил больше сайтов и не вижу ожидаемого ускорения (по-прежнему стабильно около 4х).Я начал добавлять небольшой отладочный вывод выше, продолжу исследовать, чтобы проверить, не мешает ли что-то еще соединениям ...

EDIT

Снова отредактировал код.Ну, я нашел то, что может быть узким местом.Вот реализация AsyncReadToEnd в PowerPack:

type System.IO.StreamReader with
   member s.AsyncReadToEnd () = 
       FileExtensions.UnblockViaNewThread (fun () -> s.ReadToEnd())

Другими словами, он просто блокирует поток пула потоков и читает синхронно.Argh !!!Дайте мне посмотреть, смогу ли я обойти это.

EDIT

Хорошо, AsyncStreamReader в PowerPack делает правильные вещи, и я использую это сейчас.

Однакоключевая проблема, кажется, дисперсия .

Когда вы нажимаете, скажем, на cnn.com, большая часть времени возвращается как 500 мс.Но время от времени вы получаете один запрос, который занимает 4 секунды, и это, конечно, потенциально убивает кажущееся асинхронное перфорирование, поскольку общее время - это время самого неудачного запроса.

Запустив программу выше, яувидеть ускорения от 2,5 до 9 раз на моем 2-ядерном корпусе дома.Хотя это очень сильно варьируется.Все еще возможно, что в программе есть какие-то узкие места, которые я пропустил, но я думаю, что разница в сети может объяснить все, что я вижу на данный момент.

2 голосов
/ 01 декабря 2010

Держу пари, что ускорение, которое вы испытываете, недостаточно для вашего вкуса, потому что вы используете подтип WebRequest или класс, зависящий от него (например, WebClient).Если это так, вам нужно установить MaxConnection на ConnectionManagementElement (и я предлагаю вам установить его только в случае необходимости, иначе это станет довольно трудоемкой операцией) на высокое значение, в зависимости от числаодновременных соединений, которые вы хотите инициировать из своего приложения.

2 голосов
/ 30 ноября 2010

Используя Reactive Extensions для .NET в сочетании с F #, вы можете написать очень элегантное решение - посмотрите пример на http://blog.paulbetts.org/index.php/2010/11/16/making-async-io-work-for-you-reactive-style/ (здесь используется C #, но использовать F # тоже легко; ключ - использоватьМетоды Begin / End вместо метода sync, который, даже если вы можете заставить его скомпилироваться, заблокирует потоки n ThreadPool без необходимости, вместо того, чтобы Threadpool просто выбирал процедуры завершения по мере их поступления)

1 голос
/ 01 декабря 2010

Вот некоторый код, который избегает неизвестных, таких как задержка веб-доступа.Я получаю менее 5% загрузки ЦП и около 60-80% эффективности как для синхронизации, так и для асинхронных путей кода.

open System.Diagnostics

let numWorkers = 200
let asyncDelay = 50

let main =
   let codeBlocks = [for i in 1..numWorkers -> 
                        async { do! Async.Sleep asyncDelay } ]

   while true do
      printfn "Concurrent started..."
      let sw = new Stopwatch()
      sw.Start()
      codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore
      sw.Stop()
      printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * 100) / sw.ElapsedMilliseconds)

      printfn "Synchronous started..."
      let sw = new Stopwatch()
      sw.Start()
      for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore
      sw.Stop()
      printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100) / sw.ElapsedMilliseconds)

main
1 голос
/ 30 ноября 2010

Я не F # парень, но с точки зрения чистого .NET вы ищете TaskFactory :: FromAsync, где асинхронный вызов, который вы бы включили в задачу, будет выглядеть как HttpRequest :: BeginGetResponse. Вы также можете обернуть модель EAP, которую предоставляет WebClient, с помощью TaskCompletionSource. Подробнее об этих темах в MSDN.

Надеюсь, благодаря этим знаниям вы сможете найти ближайший нативный подход F #, чтобы выполнить то, что вы пытаетесь сделать.

...