Какой шаблон F # рекомендуется для этой ситуации? - PullRequest
1 голос
/ 18 августа 2011

У меня есть ситуация, похожая на следующую:

let mutable stopped = false

let runAsync() = async {
    while not stopped do
        let! item = fetchItemToProcessAsync
        match item with
        | Some job -> job |> runJobAsync |> Async.Start
        | None -> do! Async.Sleep(1000)
}

let run() = Async.Start runAsync
let stop() =
    stopped <- true

Теперь, когда вызывается метод stop, мне нужно прекратить чтение дополнительных элементов из БД, а также дождаться завершения тех, которые в данный момент запущены, прежде чем вернуться из этой функции.

Каков наилучший способ сделать это? Я думал об использовании счетчика (с блокированными API) и возврата из метода stop, когда счетчик достигнет 0.

Если есть альтернативный способ сделать это, я был бы признателен за руководство. У меня есть чувство, что я мог бы использовать агентов здесь, но я не уверен, есть ли какой-либо доступный способ сделать это с агентом или мне все еще нужно написать свою собственную логику, чтобы определить, что задания завершены.

Ответы [ 2 ]

1 голос
/ 18 августа 2011

Проверьте этот фрагмент на fssnip.net.Это стандартный процессор заданий, который вы можете использовать.

1 голос
/ 18 августа 2011

взгляните на шаблоны, основанные на акторах, и MailboxProcessor

, в основном вы можете представить это как асинхронную очередь.Если вы используете список выполнения (начинающийся с Async.StartChild или Async.StartAsTask ) в качестве параметра для вашего цикла внутри MailboxProcessor, вы можете корректно обрабатывать выключения с помощью wait или CancellationToken)

Вот небольшой пример, который я собрал:


type Commands = 
    | RunJob of Async
    | JobDone of int
    | Quit of AsyncReplyChannel

type JobRunner() =
    let processor =
        MailboxProcessor.Start (fun inbox ->
            let rec loop (nextId, jobs) = async {
                let! cmd = inbox.Receive()
                match cmd with
                | Quit cb ->
                    if not (Map.isEmpty jobs) 
                    then async {
                            do! Async.Sleep 100
                            inbox.Post (Quit cb)}
                        |> Async.Start
                        return! loop (nextId, jobs)
                    else 
                        cb.Reply()
                        return ()
                | JobDone id ->
                    return! loop (nextId, jobs |> Map.remove id)
                | RunJob job ->
                    let runJob i = async {
                        do! job
                        inbox.Post (JobDone i)
                    }
                    let! child = Async.StartChild (runJob nextId)
                    return! loop (nextId+1, jobs |> Map.add nextId child)
            }
            loop (0, Map.empty))
    member jr.PostJob(job) = processor.Post (RunJob job)
    member jr.Quit() = processor.PostAndReply(fun cb -> Quit cb)

let postWaitJob (jobRunner : JobRunner) time =
    let job = async {
        do! Async.Sleep time
        printfn "sleept for %d ms" time }
    jobRunner.PostJob job

let testRun() =
    let jr = new JobRunner()
    printfn "starting jobs..."
    [10..-1..1] |> List.iter (fun i -> postWaitJob jr (i*1000))
    printfn "sending quit"
    jr.Quit()
    printfn "done!"

Хм ... тут возникли некоторые проблемы с редактором: он просто убивает много кода, когда я использую конвейероператор ... grrr

Краткое объяснение: как вы видите, я всегда предоставляю внутренний цикл со следующим свободным идентификатором задания и картой заданий Id-> AsyncChild.(Вы, конечно, можете реализовать другие / лучшие решения - карта не обязательна в этом примере, но вы можете расширить ее с помощью команды «Отменить JobNr» или любым другим способом). Сообщение «Выполнено» используется только для удаления заданий из этой карты. Выйтипросто проверяет, является ли карта пустой - если она не требует дополнительной работы, и процессор Mailbox завершает работу (return ()) - если она не пуста, запускается новый Async-Child, который просто ждет 100 мс, а затем повторно отправляет сообщение Quit-Message RunJobэто довольно просто - он просто связывает данную работу с сообщением JobDone в цикл вызовов MessabeboxProcessor и recursivley с обновленными значениями (nextId равен единице, а новое задание сопоставляется со старым nextId)

...