Как правильно использовать TryScan в F # - PullRequest
5 голосов
/ 03 февраля 2011

Я пытался найти пример о том, как использовать TryScan, но не нашел ни одного, не могли бы вы мне помочь?

Что я хотел бы сделать (довольно упрощенный пример): у меня естьMailboxProcessor, который принимает два типа сообщений.

  • Первый GetState возвращает текущее состояние.GetState сообщения отправляются довольно часто

  • Другой UpdateState очень дорогой (отнимает много времени) - например, загрузка чего-либо из Интернета, а затем обновление состояния соответственно.UpdateState вызывается очень редко.

Моя проблема в том, что сообщения GetState блокируются и ждут, пока не будут отправлены предыдущие UpdateState.Вот почему я пытался использовать TryScan для обработки всех GetState сообщений, но безуспешно.

Мой пример кода:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                mbox.TryScan(fun m ->
                    match m with 
                    | GetState(chnl) -> 
                        printfn "G processing TryScan"
                        chnl.Reply(state)
                        Some(async { return! loop state})
                    | _ -> None
                ) |> ignore

                let! msg = mbox.Receive()
                match msg with
                | UpdateState ->
                    printfn "U processing"
                    // something very time consuming here...
                    async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                    return! loop (state+1)
                | GetState(chnl) ->
                    printfn "G processing"
                    chnl.Reply(state)
                    return! loop state
             }
             loop 0
)

[async { for i in 1..10 do 
          printfn " U"
          mbox.Post(UpdateState)
          async { do! Async.Sleep(200) } |> Async.RunSynchronously
};
async { // wait some time so that several `UpdateState` messages are fired
        async { do! Async.Sleep(500) } |> Async.RunSynchronously
        for i in 1..20 do 
          printfn "G"
          printfn "%d" (mbox.PostAndReply(GetState))
}] |> Async.Parallel |> Async.RunSynchronously

Если вы попытаетесь запустить код, выувидим, что сообщение GetState почти не обрабатывается, так как ожидает результата.С другой стороны, UpdateState - это только огонь и забыл, таким образом блокируя эффективное получение состояния.

Редактировать

Текущее решение, которое работает для меня, вот это:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                let! res = mbox.TryScan((function
                    | GetState(chnl) -> Some(async {
                            chnl.Reply(state)
                            return state
                        })
                    | _ -> None
                ), 5)

                match res with
                | None ->
                    let! msg = mbox.Receive()
                    match msg with
                        | UpdateState ->
                            async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                            return! loop (state+1)
                        | _ -> return! loop state
                | Some n -> return! loop n
             }
             loop 0
)

Реакции на комментарии: идея с другими MailboxProcessor или ThreadPool, которая выполняет UpdateState параллельно, хороша, но в данный момент она мне не нужна.Все, что я хотел сделать, это обработать все GetState сообщений и после этого остальные.Мне все равно, что во время обработки UpdateState агент блокируется.

Я покажу вам, в чем была проблема на выходе:

// GetState messages are delayed 500 ms - see do! Async.Sleep(500)
// each UpdateState is sent after 200ms
// each GetState is sent immediatelly! (not real example, but illustrates the problem)
 U            200ms   <-- issue UpdateState
U processing          <-- process UpdateState, it takes 1sec, so other 
 U            200ms       5 requests are sent; sent means, that it is
 U            200ms       fire-and-forget message - it doesn't wait for any result
                          and therefore it can send every 200ms one UpdateState message
G                     <-- first GetState sent, but waiting for reply - so all 
                          previous UpdateState messages have to be processed! = 3 seconds
                          and AFTER all the UpdateState messages are processed, result
                          is returned and new GetState can be sent. 
 U            200ms
 U            200ms       because each UpdateState takes 1 second
 U            200ms
U processing
 U
 U
 U
 U
U processing
G processing          <-- now first GetState is processed! so late? uh..
U processing          <-- takes 1sec
3
G
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
G processing          <-- after MANY seconds, second GetState is processed!
10
G
G processing
// from this line, only GetState are issued and processed, because 
// there is no UpdateState message in the queue, neither it is sent

Ответы [ 3 ]

4 голосов
/ 03 февраля 2011

Я не думаю, что метод TryScan поможет вам в этом сценарии. Позволяет указать время ожидания, которое будет использоваться при ожидании сообщений. Как только какое-либо сообщение получено, оно начинает обрабатывать сообщение (игнорируя тайм-аут).

Например, если вы хотите подождать какое-то конкретное сообщение, но выполнять какую-то другую проверку каждую секунду (во время ожидания), вы можете написать:

let loop () = async {
  let! res = mbox.TryScan(function
    | ImportantMessage -> Some(async { 
          // process message 
          return 0
        })
    | _ -> None)
  match res with
  | None -> 
       // perform some check & continue waiting
       return! loop ()
  | Some n -> 
       // ImportantMessage was received and processed 
}

Что вы можете сделать, чтобы избежать блокировки процессора почтовых ящиков при обработке сообщения UpdateState? Процессор почтовых ящиков (логически) однопоточный - вы, вероятно, не хотите отменять обработку сообщения UpdateState, поэтому лучше всего начинать обработку в фоновом режиме и ждать, пока обработка не завершится. Код, который обрабатывает UpdateState, может затем отправить сообщение обратно в почтовый ящик (например, UpdateStateCompleted).

Вот эскиз, как это может выглядеть:

let rec loop (state) = async {
  let! msg = mbox.Receive()
  match msg with
  | GetState(repl) -> 
      repl.Reply(state)
      return! scanning state
  | UpdateState -> 
      async { 
        // complex calculation (runs in parallel)
        mbox.Post(UpdateStateCompleted newState) }
      |> Async.Start
  | UpdateStateCompleted newState ->
      // Received new state from background workflow
      return! loop newState }

Теперь, когда фоновая задача выполняется параллельно, вам нужно быть осторожным с изменяемым состоянием. Кроме того, если вы отправите UpdateState сообщений быстрее, чем сможете их обработать, у вас будут проблемы. Это можно исправить, например, игнорируя или ставя в очередь запросы, когда вы уже обрабатываете предыдущий.

3 голосов
/ 04 февраля 2011

НЕ ИСПОЛЬЗУЙТЕ TRYSCAN !!!

К сожалению, функция TryScan в текущей версии F # нарушается двумя способами.Во-первых, весь смысл в том, чтобы указать время ожидания, но реализация фактически не соблюдает его.В частности, неактуальные сообщения сбрасывают таймер.Во-вторых, как и в случае с другой функцией Scan, очередь сообщений проверяется под блокировкой, которая не позволяет другим потокам публиковать сообщения во время сканирования, которое может быть сколь угодно долгим.Следовательно, сама функция TryScan имеет тенденцию блокировать параллельные системы и может даже вводить взаимоблокировки, потому что код вызывающей стороны оценивается внутри блокировки (например, отправка из аргумента функции в Scan или TryScan может заблокировать агент, когдакод под блокировками блокировки, ожидающими получения блокировки, под которой он уже находится).

Я использовал TryScan в раннем прототипе моего рабочего кода, и это не вызвало никаких проблем.Тем не менее, мне удалось создать архитектуру, и полученная архитектура была на самом деле лучше.По сути, я с нетерпением жду Receive всех сообщений и фильтрую, используя собственную локальную очередь.

2 голосов
/ 03 февраля 2011

Как упоминал Томас, MailboxProcessor является однопоточным. Вам понадобится другой MailboxProcessor для запуска обновлений в отдельном потоке от получателя состояния.

#nowarn "40"

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | UpdateState

let runner_UpdateState = MailboxProcessor.Start(fun mbox ->
    let rec loop = async {
        let! state = mbox.Receive()
        printfn "U start processing %d" !state
        // something very time consuming here...
        do! Async.Sleep 100
        printfn "U done processing %d" !state
        state := !state + 1
        do! loop
    }
    loop
)

let mbox = MailboxProcessor.Start(fun mbox ->
    // we need a mutiple state if another thread can change it at any time
    let state = ref 0

    let rec loop = async {
        let! msg = mbox.Receive()

        match msg with
        | UpdateState -> runner_UpdateState.Post state
        | GetState chnl -> chnl.Reply !state

        return! loop 
    }
    loop)

[
    async { 
        for i in 1..10 do 
            mbox.Post UpdateState
            do! Async.Sleep 200
    };
    async { 
        // wait some time so that several `UpdateState` messages are fired
        do! Async.Sleep 1000

        for i in 1..20 do 
            printfn "G %d" (mbox.PostAndReply GetState)
            do! Async.Sleep 50
    }
] 
|> Async.Parallel 
|> Async.RunSynchronously
|> ignore

System.Console.ReadLine() |> ignore

выход:

U start processing 0
U done processing 0
U start processing 1
U done processing 1
U start processing 2
U done processing 2
U start processing 3
U done processing 3
U start processing 4
U done processing 4
G 5
U start processing 5
G 5
U done processing 5
G 5
G 6
U start processing 6
G 6
G 6
U done processing 6
G 7
U start processing 7
G 7
G 7
U done processing 7
G 8
G U start processing 8
8
G 8
U done processing 8
G 9
G 9
U start processing 9
G 9
U done processing 9
G 9
G 10
G 10
G 10
G 10

Вы также можете использовать ThreadPool.

open System.Threading

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | SetState of int
    | UpdateState

let mbox = MailboxProcessor.Start(fun mbox ->
    let rec loop state = async {
        let! msg = mbox.Receive()

        match msg with
        | UpdateState -> 
            ThreadPool.QueueUserWorkItem((fun obj -> 
                let state = obj :?> int

                printfn "U start processing %d" state
                Async.Sleep 100 |> Async.RunSynchronously
                printfn "U done processing %d" state
                mbox.Post(SetState(state + 1))

                ), state)
            |> ignore
        | GetState chnl -> 
            chnl.Reply state
        | SetState newState ->
            return! loop newState
        return! loop state
    }
    loop 0)

[
    async { 
        for i in 1..10 do 
            mbox.Post UpdateState
            do! Async.Sleep 200
    };
    async { 
        // wait some time so that several `UpdateState` messages are fired
        do! Async.Sleep 1000

        for i in 1..20 do 
            printfn "G %d" (mbox.PostAndReply GetState)
            do! Async.Sleep 50
    }
] 
|> Async.Parallel 
|> Async.RunSynchronously
|> ignore

System.Console.ReadLine () |> игнорировать

...