F # MailboxProcessor ограничить параллелизм - PullRequest
0 голосов
/ 02 мая 2018

Я новичок в F # и пытаюсь поэкспериментировать с MailboxProcessor, чтобы убедиться, что изменения состояния выполняются изолированно.

Короче говоря, я отправляю действия (неизменяемые объекты, описывающие изменение состояния) в MailboxProcessor, в рекурсивной функции я читаю сообщение и генерирую новое состояние (т.е. добавляю элемент в коллекцию в примере ниже) и отправляю это состояние до следующей рекурсии.

open System

type AppliationState =
    {
        Store : string list
    }
    static member Default = 
        {
            Store = List.empty
        }
    member this.HandleAction (action:obj) =
        match action with
        | :? string as a -> { this with Store = a :: this.Store }
        | _ -> this

type Agent<'T> = MailboxProcessor<'T>     

[<AbstractClass; Sealed>]
type AppHolder private () =
    static member private Processor = Agent.Start(fun inbox ->
        let rec loop (s : AppliationState) =
            async {
                let! action = inbox.Receive()
                let s' = s.HandleAction action
                Console.WriteLine("{s: " + s.Store.Length.ToString() + " s': " + s'.Store.Length.ToString())
                return! loop s'
                }
        loop AppliationState.Default)

    static member HandleAction (action:obj) =
        AppHolder.Processor.Post action

[<EntryPoint>]
let main argv =
    AppHolder.HandleAction "a"
    AppHolder.HandleAction "b"
    AppHolder.HandleAction "c"
    AppHolder.HandleAction "d"

    Console.ReadLine()
    0 // return an integer exit code

Ожидаемый результат:

s: 0 s': 1
s: 1 s': 2
s: 2 s': 3
s: 3 s': 4  

Что я получаю:

s: 0 s': 1
s: 0 s': 1
s: 0 s': 1
s: 0 s': 1

Читая документацию по MailboxProcessor и прибегая к ней, я пришел к выводу, что это очередь сообщений, обрабатываемая «однопоточным», вместо этого похоже, что все они обрабатываются параллельно.

Я тут совсем с поля?

Ответы [ 2 ]

0 голосов
/ 02 мая 2018

Проблема в том, что вы думаете, AppHolder.Processor будет каждый раз одним и тем же объектом, но на самом деле это каждый раз новый MailboxProcessor. Я изменил ваш код AppHolder на следующий:

[<AbstractClass; Sealed>]
type AppHolder private () =
    static member private Processor =
        printfn "Starting..."
        Agent.Start(fun inbox ->
        let rec loop (s : AppliationState) =
            async {
                let! action = inbox.Receive()
                let s' = s.HandleAction action
                printfn "{s: %A s': %A}" s s'
                return! loop s'
                }
        loop AppliationState.Default)

    static member HandleAction (action:obj) =
        AppHolder.Processor.Post action

Единственное, что я сделал, - это упростил вызов Console.WriteLine для использования printfn и %A, чтобы получить больше подробностей отладки, и добавил один вызов printfn "Starting...", который будет выполнен непосредственно перед сборкой MailboxProcessor и началось. И результат, который я получил, был:

Starting...
Starting...
Starting...
Starting...
{s: {Store = [];} s': {Store = ["b"];}}
{s: {Store = [];} s': {Store = ["d"];}}
{s: {Store = [];} s': {Store = ["c"];}}
{s: {Store = [];} s': {Store = ["a"];}}

Обратите внимание, что строка printfn "Starting..." была выполнена четыре раза.

Это ловит много новичков в F #: ключевое слово member определяет свойство , а не поле. Каждый раз, когда вы оцениваете свойство, тело этого свойства оценивается заново. Поэтому каждый раз, когда вы получаете доступ к AppHolder.Processor, вы получаете новый MailboxProcessor. Подробнее см. https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/members/properties.

То, что вы, вероятно, хотели, было следующее:

[<AbstractClass; Sealed>]
type AppHolder private () =
    static let processor =
        printfn "Starting..."
        Agent.Start(fun inbox ->
            // ...
        )

    static member HandleAction (action:obj) =
        processor.Post action
0 голосов
/ 02 мая 2018

Я думаю, что проблема должна быть в вашей реализации HandleAction. Я реализовал следующее, и он выдает ожидаемый результат.

open System

type ApplicationState =
    {
        Items: int list
    }
    static member Default = {Items = []}
    member this.HandleAction x = {this with Items = x::this.Items}

type Message = Add of int

let Processor = MailboxProcessor<Message>.Start(fun inbox ->
    let rec loop (s : ApplicationState) =
        async {
            let! (Add action) = inbox.Receive()
            let s' = s.HandleAction action
            Console.WriteLine("s: " + s.Items.Length.ToString() + " s': " + s'.Items.Length.ToString())
            return! loop s'
        }
    loop ApplicationState.Default)

Processor.Post (Add 1)
Processor.Post (Add 2)
Processor.Post (Add 3)
Processor.Post (Add 4)


// OUTPUT
// s: 0 s': 1
// s: 1 s': 2
// s: 2 s': 3
// s: 3 s': 4

EDIT

После просмотра обновленного примера кода я считаю, что правильным решением F # было бы просто переключить тип AppHolder с класса на модуль. Обновленный код будет выглядеть так:

open System

type AppliationState =
    {
        Store : string list
    }
    static member Default = 
        {
            Store = List.empty
        }
    member this.HandleAction (action:obj) =
        match action with
        | :? string as a -> { this with Store = a :: this.Store }
        | _ -> this

type Agent<'T> = MailboxProcessor<'T>     

module AppHolder =
    let private processor = Agent.Start(fun inbox ->
        let rec loop (s : AppliationState) =
            async {
                let! action = inbox.Receive()
                let s' = s.HandleAction action
                Console.WriteLine("{s: " + s.Store.Length.ToString() + " s': " + s'.Store.Length.ToString())
                return! loop s'
            }
        loop AppliationState.Default)

    let handleAction (action:obj) =
        processor.Post action


AppHolder.handleAction "a"
AppHolder.handleAction "b"
AppHolder.handleAction "c"
AppHolder.handleAction "d"

Выводит тот же результат, что и раньше:

{s: 0 s': 1
{s: 1 s': 2
{s: 2 s': 3
{s: 3 s': 4
...