Скомпилированная консольная программа командной строки не ожидает завершения всех потоков - PullRequest
0 голосов
/ 10 июля 2011

Некоторые потоки будут прерваны до завершения, если код скомпилирован в консольную программу или запущен как fsi --use: Program.fs --exec --quiet .Есть ли способ дождаться окончания всех потоков?

Эта проблема может быть описана как « проблема с выходом из программы, когда существует несколько MailboxProcessers ».

Пример вывода

(Обратите внимание, что последняя строка усекается, а последняя функция вывода (printfn "[Main] after crawl") никогда не выполняется.)

[Main] before crawl
[Crawl] before return result
http://news.google.com crawled by agent 1.
[supervisor] reached limit
Agent 5 is done.
http://www.gstatic.com/news/img/favicon.ico crawled by agent 1.
[supervisor] reached limit
Agent 1 is done.
http://www.google.com/imghp?hl=en&tab=ni crawled by agent 4.
[supervisor] reached limit
Agent 4 is done.
http://www.google.com/webhp?hl=en&tab=nw crawled by agent 2.
[supervisor] reached limit
Agent 2 is done.
http://news.google.

Код

Редактировать: добавлено несколько System.Threading.Thread.CurrentThread.IsBackground <- false.

open System
open System.Collections.Concurrent
open System.Collections.Generic
open System.IO
open System.Net
open System.Text.RegularExpressions

module Helpers =

    type Message =
        | Done
        | Mailbox of MailboxProcessor<Message>
        | Stop
        | Url of string option
        | Start of AsyncReplyChannel<unit>

    // Gates the number of crawling agents.
    [<Literal>]
    let Gate = 5

    // Extracts links from HTML.
    let extractLinks html =
        let pattern1 = "(?i)href\\s*=\\s*(\"|\')/?((?!#.*|/\B|" + 
                       "mailto:|location\.|javascript:)[^\"\']+)(\"|\')"
        let pattern2 = "(?i)^https?"

        let links =
            [
                for x in Regex(pattern1).Matches(html) do
                    yield x.Groups.[2].Value
            ]
            |> List.filter (fun x -> Regex(pattern2).IsMatch(x))
        links

    // Fetches a Web page.
    let fetch (url : string) =
        try
            let req = WebRequest.Create(url) :?> HttpWebRequest
            req.UserAgent <- "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)"
            req.Timeout <- 5000
            use resp = req.GetResponse()
            let content = resp.ContentType
            let isHtml = Regex("html").IsMatch(content)
            match isHtml with
            | true -> use stream = resp.GetResponseStream()
                      use reader = new StreamReader(stream)
                      let html = reader.ReadToEnd()
                      Some html
            | false -> None
        with
        | _ -> None

    let collectLinks url =
        let html = fetch url
        match html with
        | Some x -> extractLinks x
        | None -> []

open Helpers

// Creates a mailbox that synchronizes printing to the console (so 
// that two calls to 'printfn' do not interleave when printing)
let printer = 
    MailboxProcessor.Start(fun x -> async {
        while true do 
        let! str = x.Receive()
        System.Threading.Thread.CurrentThread.IsBackground <- false
        printfn "%s" str })
// Hides standard 'printfn' function (formats the string using 
// 'kprintf' and then posts the result to the printer agent.
let printfn fmt = 
    Printf.kprintf printer.Post fmt

let crawl url limit = 
    // Concurrent queue for saving collected urls.
    let q = ConcurrentQueue<string>()

    // Holds crawled URLs.
    let set = HashSet<string>()


    let supervisor =
        MailboxProcessor.Start(fun x -> async {
            System.Threading.Thread.CurrentThread.IsBackground <- false
            // The agent expects to receive 'Start' message first - the message
            // carries a reply channel that is used to notify the caller
            // when the agent completes crawling.
            let! start = x.Receive()
            let repl =
              match start with
              | Start repl -> repl
              | _ -> failwith "Expected Start message!"

            let rec loop run =
                async {
                    let! msg = x.Receive()
                    match msg with
                    | Mailbox(mailbox) -> 
                        let count = set.Count
                        if count < limit - 1 && run then 
                            let url = q.TryDequeue()
                            match url with
                            | true, str -> if not (set.Contains str) then
                                                let set'= set.Add str
                                                mailbox.Post <| Url(Some str)
                                                return! loop run
                                            else
                                                mailbox.Post <| Url None
                                                return! loop run

                            | _ -> mailbox.Post <| Url None
                                   return! loop run
                        else
                            printfn "[supervisor] reached limit" 
                            // Wait for finishing
                            mailbox.Post Stop
                            return! loop run
                    | Stop -> printfn "[Supervisor] stop"; return! loop false
                    | Start _ -> failwith "Unexpected start message!"
                    | Url _ -> failwith "Unexpected URL message!"
                    | Done -> printfn "[Supervisor] Supervisor is done."
                              (x :> IDisposable).Dispose()
                              // Notify the caller that the agent has completed
                              repl.Reply(())
                }
            do! loop true })


    let urlCollector =
        MailboxProcessor.Start(fun y ->
            let rec loop count =
                async {
                    System.Threading.Thread.CurrentThread.IsBackground <- false
                    let! msg = y.TryReceive(6000)
                    match msg with
                    | Some message ->
                        match message with
                        | Url u ->
                            match u with
                            | Some url -> q.Enqueue url
                                          return! loop count
                            | None -> return! loop count
                        | _ ->
                            match count with
                            | Gate -> (y :> IDisposable).Dispose()
                                      printfn "[urlCollector] URL collector is done."
                                      supervisor.Post Done
                            | _ -> return! loop (count + 1)
                    | None -> supervisor.Post Stop
                              return! loop count
                }
            loop 1)

    /// Initializes a crawling agent.
    let crawler id =
        MailboxProcessor.Start(fun inbox ->
            let rec loop() =
                async {
                    System.Threading.Thread.CurrentThread.IsBackground <- false
                    let! msg = inbox.Receive()
                    match msg with
                    | Url x ->
                        match x with
                        | Some url -> 
                                let links = collectLinks url
                                printfn "%s crawled by agent %d." url id
                                for link in links do
                                    urlCollector.Post <| Url (Some link)
                                supervisor.Post(Mailbox(inbox))
                                return! loop()
                        | None -> supervisor.Post(Mailbox(inbox))
                                  return! loop()
                    | _ -> printfn "Agent %d is done." id
                           urlCollector.Post Done
                           (inbox :> IDisposable).Dispose()
                    }
            loop())

    // Send 'Start' message to the main agent. The result
    // is asynchronous workflow that will complete when the
    // agent crawling completes
    let result = supervisor.PostAndAsyncReply(Start)
    // Spawn the crawlers.
    let crawlers = 
        [
            for i in 1 .. Gate do
                yield crawler i
        ]

    // Post the first messages.
    crawlers.Head.Post <| Url (Some url)
    crawlers.Tail |> List.iter (fun ag -> ag.Post <| Url None) 
    printfn "[Crawl] before return result"
    result

// Example:
printfn "[Main] before crawl"
crawl "http://news.google.com" 5
|> Async.RunSynchronously
printfn "[Main] after crawl"

Ответы [ 4 ]

3 голосов
/ 11 июля 2011

Если я правильно распознаю код, он основан на вашем предыдущем вопросе (и моем ответе).

Программа ожидает, пока supervisor agent не завершится (отправив сообщение Start и затем ожидая ответа, используя RunSynchronously).Это должно гарантировать, что основной агент, а также все сканеры завершают работу до выхода из приложения.

Проблема заключается в том, что он не ожидает завершения работы агента printer!Итак, последний вызов (переопределенной) функции printfn отправляет сообщение агенту, а затем приложение завершается, не дожидаясь окончания работы агента печати. ​​

Насколько я знаю, «стандарт» отсутствуетшаблон "для ожидания, пока агент не завершит обработку всех сообщений, находящихся в данный момент в очереди.Вот некоторые идеи, которые вы можете попробовать:

  • Вы можете проверить свойство CurrentQueueLength (подождите, пока оно не станет 0), но это все еще не означает, что агент завершен обработка всех сообщений.

  • Вы можете сделать агента более сложным, добавив новый тип сообщения и подождать, пока агент ответит на это сообщение (точно так же, как вы ожидаете ответа на * 1026).* сообщение).

0 голосов
/ 12 июля 2011

Я думаю, что вроде как решил проблему: добавив System.Threading.Thread.CurrentThread.IsBackground <- false после let! в агенте принтера.

Однако я попытался изменить исходный код (первую версию до исправления AsyncChannel Томаса), добавив System.Threading.Thread.CurrentThread.IsBackground <- false после всех let!, и он все еще не работает. Понятия не имею.

Спасибо всем за вашу помощь. Наконец-то я могу запустить свое первое приложение F # для пакетного процесса. Я думаю, что MailboxProcessor действительно должен установить IsBackground в false по умолчанию. В любом случае, чтобы попросить Microsoft изменить его.

[Обновление] Только что обнаружил, что скомпилированная сборка работает хорошо. Но fsi --user:Program --exec --quiet все тот же. Кажется, ошибка в ФСИ?

0 голосов
/ 11 июля 2011

.NET-потоки имеют свойство Thread.IsBackground, если для этого параметра установлено значение true, поток не будет препятствовать выходу процесса. Если установлено значение false, процесс не будет завершен. Смотри: http://msdn.microsoft.com/en-us/library/system.threading.thread.isbackground.aspx

Потоки, которые запускают агенты, поступают из пула потоков, поэтому для Thread.IsBackground по умолчанию установлено значение false.

Вы можете попытаться установить для IsBackground потока значение false каждый раз, когда читаете сообщение. Вы можете добавить функцию, чтобы сделать это, чтобы сделать подход более чистым. Возможно, это не лучшее решение проблемы, так как каждый раз, когда вы используете let! Вы можете изменить потоки, так что это должно быть тщательно реализовано для правильной работы. Я просто подумал упомянуть это, чтобы ответить на конкретный вопрос

Можно ли дождаться окончания всех тем?

и помогает людям понять, почему одни потоки прекращают выход из программы, а другие - нет.

0 голосов
/ 10 июля 2011

Предупреждаю, что я знаю ноль F #, но обычно вы ожидаете всех интересующих потоков, используя Thread.Join . Мне кажется, что в вашем случае вам нужно подождать чего-нибудь интересного, что начнется с звонка на .Start.

Вы также можете рассмотреть Task Parallel Library, которая дает вам более высокую (более простую) абстракцию для необработанных управляемых потоков. Пример ожидания выполнения задач здесь .

...