F # CancellationTokenSource.Cancel () не отменяет основную работу - PullRequest
1 голос
/ 10 апреля 2020

У меня есть потенциально очень долго работающая функция, которая иногда может зависать. Итак, я подумал, что если я включу его в рабочий процесс async, то я смогу отменить его. Вот пример FSI, который не работает (но то же самое происходит с скомпилированным кодом):

open System.Threading

let mutable counter = 0

/// Emulates an external C# sync function that hung up.
/// Please, don't change it to some F# async stuff because
/// that won't fix that C# method.
let run() =
    while true
        do
            printfn "counter = %A" counter
            Thread.Sleep 1000
            counter <- counter + 1


let onRunModel() =
    let c = new CancellationTokenSource()
    let m = async { do run() }
    Async.Start (m, c.Token)
    c


let tryCancel() =
    printfn "Starting..."
    let c = onRunModel()
    printfn "Waiting..."
    Thread.Sleep 5000
    printfn "Cancelling..."
    c.Cancel()
    printfn "Waiting again..."
    Thread.Sleep 5000
    printfn "Completed."


#time
tryCancel()
#time

Если вы запустите его в FSI, вы увидите нечто подобное:

Starting...
Waiting...
counter = 0
counter = 1
counter = 2
counter = 3
counter = 4
Cancelling...
Waiting again...
counter = 5
counter = 6
counter = 7
counter = 8
counter = 9
Completed.
Real: 00:00:10.004, CPU: 00:00:00.062, GC gen0: 0, gen1: 0, gen2: 0
counter = 10
counter = 11
counter = 12
counter = 13
counter = 14
counter = 15
counter = 16

, что означает, что он вообще не останавливается после вызова c.Cancel().

Что я делаю не так и как заставить это работать?

Вот некоторая дополнительная информация :

  1. Когда код зависает, он делает это в какой-то внешней библиотеке syn c C#, которую я не могу контролировать. Поэтому проверка на токен отмены в коде, который я контролирую, бесполезна. Вот почему функция run() выше была смоделирована таким образом.
  2. Мне не нужно сообщение о завершении и / или прогрессе. Это уже сделано через какую-то систему обмена сообщениями, и это выходит за рамки вопроса.
  3. По сути, мне просто нужно убить фоновую работу, как только я "решу" сделать это.

Ответы [ 2 ]

1 голос
/ 10 апреля 2020

Вы передаете управление сегменту кода, который, хотя и заключен в блок async, не имеет средств для проверки отмены. Если вы создадите свой l oop, непосредственно обернутый в async, или замените его на рекурсивный async l oop, он будет работать, как и ожидалось:

let run0 () =   // does not cancel
    let counter = ref 0
    while true do
        printfn "(0) counter = %A" !counter
        Thread.Sleep 1000
        incr counter
let m = async { run0 () }

let run1 () =   // cancels
    let counter = ref 0
    async{
        while true do
            printfn "(1) counter = %A" !counter
            Thread.Sleep 1000
            incr counter }

let run2 =      // cancels too
    let rec aux counter = async {
        printfn "(2) counter = %A" counter
        Thread.Sleep 1000
        return! aux (counter + 1) }
    aux 0

printfn "Starting..."
let cts = new CancellationTokenSource()
Async.Start(m, cts.Token)
Async.Start(run1(), cts.Token)
Async.Start(run2, cts.Token)
printfn "Waiting..."
Thread.Sleep 5000
printfn "Cancelling..."
cts.Cancel()
printfn "Waiting again..."
Thread.Sleep 5000
printfn "Completed."

Слово предостережение: вложенные вызовы async в F # автоматически проверяются на отмену, поэтому предпочтительным является do! Async.Sleep. Если вы идете по рекурсивному маршруту, обязательно включите хвостовую рекурсию через return!. Дальнейшее чтение: блог Скотта В. о Асинхронное программирование и Асин c в C# и F # Асинхронные ошибки в C# Томаса Петрисека.

0 голосов
/ 10 апреля 2020

Этот кусок кода был разработан, чтобы решить ситуацию, когда я не мог получить некоторые вызовы для завершения / тайм-аута. Они бы просто повесили. Может быть, вы можете получить некоторые идеи, которые помогут вам решить вашу проблему.

Интересной частью для вас будут только две первые функции. Остальное только для демонстрации того, как я их использую.

module RobustTcp =

    open System
    open System.Text
    open System.Net.Sockets
    open Railway

    let private asyncSleep (sleepTime: int) (error: 'a) = async {
        do! Async.Sleep sleepTime
        return Some error
    }

    let private asyncWithTimeout asy (timeout: int) (error: 'a) =
        Async.Choice [ asy; asyncSleep timeout error ]

    let private connectTcpClient (host: string) (port: int) (tcpClient: TcpClient) = async {
        let asyncConnect = async {
            do! tcpClient.ConnectAsync(host, port) |> Async.AwaitTask
            return Some tcpClient.Connected }
        match! asyncWithTimeout asyncConnect 1_000 false with
        | Some isConnected -> return Ok isConnected
        | None -> return Error "unexpected logic error in connectTcpClient"
        }

    let private writeTcpClient (outBytes: byte[]) (tcpClient: TcpClient) = async {
        let asyncWrite = async {
            let stream = tcpClient.GetStream()
            do! stream.WriteAsync(outBytes, 0, outBytes.Length) |> Async.AwaitTask
            do! stream.FlushAsync() |> Async.AwaitTask
            return Some (Ok ()) }
        match! asyncWithTimeout asyncWrite 10_000 (Error "timeout writing") with
        | Some isWrite -> return isWrite
        | None -> return Error "unexpected logic error in writeTcpClient"
        }

    let private readTcpClient (tcpClient: TcpClient) = async {
        let asyncRead = async {
            let inBytes: byte[] = Array.zeroCreate 1024
            let stream = tcpClient.GetStream()
            let! byteCount = stream.ReadAsync(inBytes, 0, inBytes.Length) |> Async.AwaitTask
            let bytesToReturn = inBytes.[ 0 .. byteCount - 1 ]
            return Some (Ok bytesToReturn) }
        match! asyncWithTimeout asyncRead 2_000 (Error "timeout reading reply") with
        | Some isRead ->
            match isRead with
            | Ok s -> return Ok s
            | Error error -> return Error error
        | None -> return Error "unexpected logic error in readTcpClient"
        }

    let sendReceiveBytes (host: string) (port: int) (bytesToSend: byte[]) = async {
        try
            use tcpClient = new TcpClient()
            match! connectTcpClient host port tcpClient with
            | Ok isConnected ->
                match isConnected with
                | true ->
                    match! writeTcpClient bytesToSend tcpClient with
                    | Ok () ->
                        let! gotData = readTcpClient tcpClient
                        match gotData with
                        | Ok result -> return Ok result
                        | Error error -> return Error error
                    | Error error -> return Error error
                | false -> return Error "Not connected."
            | Error error -> return Error error
        with
        | :? AggregateException as ex ->
            (* TODO ? *)
            return Error ex.Message
        | ex ->
            (*
            printfn "Exception in getStatus : %s" ex.Message
            *)
            return Error ex.Message
    }

    let sendReceiveText (host: string) (port: int) (textToSend: string) (encoding: Encoding) =
        encoding.GetBytes textToSend
        |> sendReceiveBytes host port
        |> Async.map (Result.map encoding.GetString)
...