Первый цикл MailboxProcessor не может быть запущен, если программа сразу не работает - PullRequest
3 голосов
/ 19 марта 2019

У меня есть команда, периодически запускающая проверку SFTP и записывающая результат в файл.

let logPath = Path.Combine(config.["SharedFolder"],timestamp)
let sw = new StreamWriter(logPath,true)
//...
[<EntryPoint>]
let main argv = 
    try 
        sftpExample config.["SharedFolder"] config.["SFTPFolder"] 22 "usr" "pswd" |> ignore
    with 
    | ex -> 
        ex.Message |> printerAgent.Post
        printfn "%s" ex.Message // <- NOTICE THIS LINE
    sw.Close()
    sw.Dispose()
0  

Зацикливается на MailboxProcessor

let printerAgent = MailboxProcessor.Start(fun inbox-> 
    // the message processing function
    let rec messageLoop() = async{        
        // read a message
        let! msg = inbox.Receive()
        // process a message
        sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), msg)
        printfn "%s" msg
        // loop to top
        return! messageLoop()  
        }
    // start the loop 
    messageLoop() 
    )

, который вызывается для записи сообщений в журнал

let sftpExample local host port username (password:string) =
    async {
        use client = new SftpClient(host, port, username, password)
        client.Connect()
        sprintf "Connected to %s\nroot dir list" host  |> printerAgent.Post
        do! downloadDir local client ""   
        sprintf "Done, disconnecting now" |> printerAgent.Post
        client.Disconnect()
    } |> Async.RunSynchronously

Загрузка файла асинхронный , а также соответствующие сообщения, но все, кажется, работает хорошо.

Проблема в том, что - если по каким-либо причинам соединение sftp немедленно завершается неудачно, MailboxProcessor не имеет временичтобы записать сообщение об исключении.

То, что я пытался сделать - что действительно работает - добавлял printfn "%s" ex.Message до конца: я просто хотел узнать, не представляет ли кто-нибудь лучшее решение.

К вашему сведению, полный код в этом гисте .

Ответы [ 2 ]

3 голосов
/ 19 марта 2019

Фактически, вам нужно, чтобы программа ожидала, пока MailboxProcessor завершит обработку всей своей очереди сообщений, прежде чем программа закроется. Кажется, ваш printfn "%s" ex.Message работает, но он не гарантированно работает: если у MailboxProcessor в очереди было несколько элементов, поток, выполняющий функцию printfn, мог бы завершиться до того, как поток MailboxProcessor успел пройти через все свои сообщения. .

Дизайн, который я бы порекомендовал, состоит в том, чтобы изменить вход вашего printerAgent на DU, как показано ниже:

type printerAgentMsg =
    | Message of string
    | Shutdown

Затем, когда вы хотите, чтобы агент принтера завершил отправку своих сообщений, используйте MailboxProcessor.PostAndReply (и обратите внимание на пример использования в документах) в функции main и отправьте ему сообщение Shutdown , Помните, что сообщения MailboxProcessor ставятся в очередь: к тому времени, как он получит сообщение Shutdown, он уже пройдет через остальные сообщения в очереди. Поэтому все, что нужно для обработки сообщения Shutdown, это вернуть ответ unit и просто больше не вызывать его цикл. И поскольку вы использовали PostAndReply вместо PostAndReplyAsync, функция main будет блокироваться до тех пор, пока MailboxProcessor не завершит всю свою работу. (Чтобы избежать любых шансов на вечную блокировку, я бы рекомендовал установить время ожидания, например, 10 секунд в вашем вызове PostAndReply; по умолчанию время ожидания равно -1, что означает ожидание навсегда).

РЕДАКТИРОВАТЬ: Вот пример (НЕ проверенный, используйте на свой страх и риск) того, что я имею в виду:

type printerAgentMsg =
    | Message of string
    | Shutdown of AsyncReplyChannel<unit>

let printerAgent = MailboxProcessor.Start(fun inbox-> 
    // the message processing function
    let rec messageLoop() = async{        
        // read a message
        let! msg = inbox.Receive()
        // process a message
        match msg with
        | Message text ->
            sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), text)
            printfn "%s" text
            // loop to top
            return! messageLoop()
        | Shutdown replyChannel ->
            replyChannel.Reply()
            // We do NOT do return! messageLoop() here
        }
    // start the loop 
    messageLoop() 
    )

let logPath = Path.Combine(config.["SharedFolder"],timestamp)
let sw = new StreamWriter(logPath,true)
//...
[<EntryPoint>]
let main argv = 
    try 
        sftpExample config.["SharedFolder"] config.["SFTPFolder"] 22 "usr" "pswd" |> ignore
    with 
    | ex -> 
        ex.Message |> Message |> printerAgent.Post
        printfn "%s" ex.Message // <- NOTICE THIS LINE
    printerAgent.PostAndReply( (fun replyChannel -> Shutdown replyChannel), 10000)  // Timeout = 10000 ms = 10 seconds
    sw.Close()
    sw.Dispose()
1 голос
/ 19 марта 2019

Самое простое решение - использовать обычную (синхронную) функцию для ведения журнала вместо MailboxProcessor или использовать некоторые каркасы ведения журнала и сбрасывать средства ведения журнала в конце основной функции.Если вы хотите продолжать использовать printingAgent, вы можете реализовать «синхронный» режим следующим образом:

type Msg =
    | Log of string
    | LogAndWait of string * AsyncReplyChannel<unit>

let printerAgent = MailboxProcessor.Start(fun inbox -> 
    let processLogMessage logMessage =
        sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), logMessage)
        printfn "%s" logMessage
    let rec messageLoop() = async{        
        let! msg = inbox.Receive()
        match msg with 
        | Log logMessage ->
            processLogMessage logMessage
        | LogAndWait (logMessage, replyChannel) ->
            processLogMessage logMessage
            replyChannel.Reply()
        return! messageLoop()  
        }
    messageLoop() 
    )

, который вы затем будете использовать либо асинхронно

printerAgent.Post(Log "Message")

, либо синхронно

printerAgent.PostAndReply(fun channel -> LogAndWait("Message", channel))

Вы должны использовать синхронную альтернативу, когда вы регистрируете исключение в основной функции.

...