Присоединяешься к первой законченной теме? - PullRequest
2 голосов
/ 24 июля 2011

Я пишу серию алгоритмов поиска графов на F # и подумал, что было бы неплохо воспользоваться преимуществами распараллеливания.Я хотел выполнить несколько потоков параллельно и взять результат первого, чтобы закончить.У меня есть реализация, но она не красивая.

Два вопроса: есть ли стандартное имя для такого рода функций?Не Join или JoinAll, но JoinFirst?Во-вторых, есть ли более идиоматический способ сделать это?

//implementation
let makeAsync (locker:obj) (shared:'a option ref) (f:unit->'a) =
    async {
        let result = f()
        Monitor.Enter locker
        shared := Some result
        Monitor.Pulse locker
        Monitor.Exit locker
    }

let firstFinished test work =
    let result = ref Option.None
    let locker = new obj()
    let cancel = new CancellationTokenSource()    
    work |> List.map (makeAsync locker result) |> List.map (fun a-> Async.StartAsTask(a, TaskCreationOptions.None, cancel.Token)) |> ignore
    Monitor.Enter locker
    while (result.Value.IsNone || (not <| test result.Value.Value)) do
        Monitor.Wait locker |> ignore
    Monitor.Exit locker
    cancel.Cancel()
    match result.Value with
    | Some x-> x
    | None -> failwith "Don't pass in an empty list"
//end implentation

//testing
let delayReturn (ms:int) value = 
    fun ()->
        Thread.Sleep ms
        value

let test () =
    let work = [ delayReturn 1000 "First!"; delayReturn 5000 "Second!" ]
    let result = firstFinished (fun _->true) work
    printfn "%s" result

Ответы [ 4 ]

3 голосов
/ 24 июля 2011

Будет ли работать, чтобы передать CancellationTokenSource и test в каждый асинхронный и получить первый, который вычисляет действительный результат, отменить другие?

let makeAsync (cancel:CancellationTokenSource) test f =
  let rec loop() =
    async {
      if cancel.IsCancellationRequested then 
        return None
      else
        let result = f()
        if test result then
          cancel.Cancel()
          return Some result
        else return! loop()
    }
  loop()

let firstFinished test work =
  match work with
  | [] -> invalidArg "work" "Don't pass in an empty list"
  | _ ->
    let cancel = new CancellationTokenSource()    
    work
    |> Seq.map (makeAsync cancel test) 
    |> Seq.toArray
    |> Async.Parallel
    |> Async.RunSynchronously
    |> Array.pick id

Этот подход имеет несколько улучшений: 1) этоиспользует только async (он не смешивается с Task, что является альтернативой для выполнения того же действия - async более идиоматичен в F #);2) нет общего состояния, кроме CancellationTokenSource, которое было разработано для этой цели;3) метод чистого объединения функций упрощает добавление дополнительной логики / преобразований в конвейер, включая тривиальное включение / отключение параллелизма.

2 голосов
/ 24 июля 2011

В параллельной библиотеке задач в .NET 4 это называется WaitAny.Например, следующий фрагмент создает 10 задач и ожидает завершения любой из них:

open System.Threading

Array.init 10 (fun _ ->
  Tasks.Task.Factory.StartNew(fun () ->
    Thread.Sleep 1000))
|> Tasks.Task.WaitAny
2 голосов
/ 24 июля 2011

К сожалению, встроенная операция для этого не предоставляется Async, но я все равно буду использовать асинхронные F #, потому что они напрямую поддерживают отмену.Когда вы запускаете рабочий процесс, используя Async.Start, вы можете передать ему токен отмены, и рабочий процесс автоматически остановится, если токен будет отменен.

Это означает, что вы должны запускать рабочие процессы явно (вместо использования * 1005).*), поэтому синхронизация должна быть написана от руки.Вот простая версия метода Async.Choice, который делает это (на данный момент он не обрабатывает исключения):

open System.Threading

type Microsoft.FSharp.Control.Async with
  /// Takes several asynchronous workflows and returns 
  /// the result of the first workflow that successfuly completes
  static member Choice(workflows) = 
    Async.FromContinuations(fun (cont, _, _) ->
      let cts = new CancellationTokenSource()
      let completed = ref false
      let lockObj = new obj()
      let synchronized f = lock lockObj f

      /// Called when a result is available - the function uses locks
      /// to make sure that it calls the continuation only once
      let completeOnce res =
        let run =
          synchronized(fun () ->
            if completed.Value then false
            else completed := true; true)
        if run then cont res

      /// Workflow that will be started for each argument - run the 
      /// operation, cancel pending workflows and then return result
      let runWorkflow workflow = async {
        let! res = workflow
        cts.Cancel()
        completeOnce res }

      // Start all workflows using cancellation token
      for work in workflows do
        Async.Start(runWorkflow work, cts.Token) )

Как только мы напишем эту операцию (которая немного сложна, но должнанаписать только один раз), решение проблемы довольно просто.Вы можете записать свои операции в виде асинхронных рабочих процессов, и они будут отменены автоматически после завершения первого:

let delayReturn n s = async {
  do! Async.Sleep(n) 
  printfn "returning %s" s
  return s }

Async.Choice [ delayReturn 1000 "First!"; delayReturn 5000 "Second!" ]
|> Async.RunSynchronously

Когда вы запустите это, он напечатает только «возвращение первым!»потому что второй рабочий процесс будет отменен.

2 голосов
/ 24 июля 2011

Если вы можете использовать «Реактивные расширения (Rx)» в своем проекте, метод joinFirst можно реализовать следующим образом:

let joinFirst (f : (unit->'a) list) = 
    let c = new CancellationTokenSource()
    let o = f |> List.map (fun i ->
                    let j = fun() -> Async.RunSynchronously (async {return i() },-1,c.Token)
                    Observable.Defer(fun() -> Observable.Start(j))
                    )
            |> Observable.Amb
    let r = o.First()
    c.Cancel()
    r

Пример использования:

[20..30] |> List.map (fun i -> fun() -> Thread.Sleep(i*100); printfn "%d" i; i)
|> joinFirst |> printfn "Done %A"
Console.Read() |> ignore

Обновление:

Использование процессора почтовых ящиков:

type WorkMessage<'a> = 
      Done of 'a
    | GetFirstDone of AsyncReplyChannel<'a>

let joinFirst (f : (unit->'a) list) = 
    let c = new CancellationTokenSource()
    let m = MailboxProcessor<WorkMessage<'a>>.Start(
              fun mbox -> async { 
                let afterDone a m = 
                    match m with
                    | GetFirstDone rc -> 
                        rc.Reply(a);
                        Some(async {return ()})
                    | _ -> None
                let getDone m = 
                    match m with
                    |Done a -> 
                        c.Cancel()
                        Some (async {
                                do! mbox.Scan(afterDone a)
                                })  
                    |_ -> None
                do! mbox.Scan(getDone)
                return ()
             } )
    f 
    |> List.iter(fun t -> try 
                            Async.RunSynchronously (async {let out = t()
                                                           m.Post(Done out)
                                                           return ()},-1,c.Token)
                          with
                          _ -> ())
    m.PostAndReply(fun rc -> GetFirstDone rc)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...