Как совместить агентный параллелизм со свободной монадой? - PullRequest
0 голосов
/ 01 марта 2019

Продолжая мое повторное обнаружение бесплатных монад, я решил преобразовать небольшую программу, которую я пишу, в свободную монаду.

Программа собирает сообщения, полученные через MQTT с некоторого эфира.датчики у меня есть, и записывает результаты в базу данных.Я еще не реализовал какую-либо обработку ошибок, которая придет позже.

open System
open uPLibrary.Networking.M2Mqtt
open uPLibrary.Networking.M2Mqtt.Messages
open MySql.Data.MySqlClient


let connString = "server=...."

type agentMessage =
    |Mqtt of string * DateTime
    |ExitCode
    |WaitForExit of AsyncReplyChannel<unit>
    |NotifyDbReady

type agentState =
    {
        MqttList : (string * DateTime) list
        ReplyOpt : AsyncReplyChannel<unit> option
        DbReady : bool
    }

type SensorReading =
    {
        SensorName:string
        SensorDate:string
        ReadingName:string
        ReadingValue:string
    }

let getSensorReading (s:string, dt:DateTime) =
    match s.Split("@") with
    |ar when ar.Length = 2 ->
        ar.[1].Split("|")
        |> Array.choose (fun r ->
            match r.Split(":") with
            |sar when sar.Length = 2 ->
                {
                    SensorName = ar.[0]
                    SensorDate = dt.ToString("yyyy-MM-dd HH:mm:ss")
                    ReadingName = sar.[0]
                    ReadingValue = sar.[1]
                } |> Some
            |sar ->
                printfn "Reading string [%s] has %i values" r sar.Length
                None
        ) |> Some
    |ar ->
        printfn "Message [%s] has %i values" s ar.Length
        None

let getSensorReadings lst = lst |> List.toArray |> Array.choose getSensorReading |> Array.collect id

let getLoadReadingsSql readings =
    readings |> Array.map (fun r ->
        sprintf "('%s', '%s', '%s', %s)" r.SensorName r.SensorDate r.ReadingName r.ReadingValue
    ) |> String.concat ", "
    |> sprintf "insert into sensor_reading_staging(sensor_name, sensor_date, reading_name, reading_value) values %s"
    |> (fun sql -> 
        MySqlHelper.ExecuteNonQuery(connString, sql) |> ignore
        MySqlHelper.ExecuteNonQuery(connString, "call process_sensor_reading_staging()") |> ignore
    )

let agent = MailboxProcessor.Start (fun inbox ->
    let processSensorReadingList lst =
        getSensorReadings lst
        |> getLoadReadingsSql

        inbox.Post NotifyDbReady

    let rec messageLoop oldState = async{
        let! msg = inbox.Receive()
        let newState =
            match msg with
            |Mqtt (s, dt) when oldState.DbReady ->
                async{(s, dt) :: oldState.MqttList |> processSensorReadingList } |> Async.Start
                {oldState with MqttList = []; DbReady = false}
            |Mqtt (s, dt) ->
                {oldState with MqttList = (s, dt) :: oldState.MqttList}
            |NotifyDbReady when oldState.MqttList.Length > 0 ->
                async{oldState.MqttList |> processSensorReadingList } |> Async.Start
                {oldState with MqttList = []; DbReady = false}
            |NotifyDbReady ->
                {oldState with DbReady = true}
            |WaitForExit rep ->
                {oldState with ReplyOpt = Some rep}
            |ExitCode ->
                oldState.ReplyOpt |> Option.map (fun rep -> rep.Reply()) |> ignore
                {oldState with ReplyOpt = None}

        return! messageLoop newState
    }
    messageLoop {MqttList = []; ReplyOpt = None; DbReady = true}
)

[<EntryPoint>]
let main argv =
    let client = new MqttClient(argv.[0])
    printfn "Connecting to broker"
    client.Connect(Guid.NewGuid().ToString()) |> ignore

    client.Subscribe([|"AirSensorReadings"|], [|MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE|]) |> ignore

    use __ =
        client.MqttMsgPublishReceived
        |> Observable.subscribe(fun evArgs -> (evArgs.Message |> Array.map char |> String.Concat, DateTime.Now) |> Mqtt |> agent.Post)

    agent.PostAndReply (fun rep -> WaitForExit rep)

    //the agent never actually receives the exit message
    0 // return an integer exit code

Я слежу за отличным постом в блоге Марка Симанна за это.

Так что, думаю, янужно взять каждую функцию с возвращаемым типом единицы измерения и передать ее интерпретатору.

Кажется, что это

  1. Получение сообщения от MQTT
  2. Запись вDB
  3. Пост агенту

Первые 2 кажутся достаточно простыми.Последнее заставляет меня немного почесать голову.AST, полученный в результате этого упражнения, по сути представляет собой последовательный список инструкций.В этом нет ничего параллельного.Но сообщения, которые отправляются агенту, могут появиться в любое время.Так как мне объединить эти 2 понятия?Может ли быть так, что каждое полученное сообщение генерирует новый AST, который в итоге приводит к чистой единице?Другими словами, только пункт 2 в списке выше всегда моделируется как инструкция.Или у меня были бы совершенно отдельные AST для агента и для работы, которая происходит при записи в БД?

1 Ответ

0 голосов
/ 03 марта 2019

Хорошо, я придумала, как мне кажется, достойный способ объединить эти 2 понятия.Следующий код самодостаточен.

open System

type SensorReading =
    {
        SensorName:string
        SensorDate:string
        ReadingName:string
        ReadingValue:string
    }

type AgentState =
    {
        MqttList : (string * DateTime) list
        DbReady : bool
    }

type AgentMessage =
    |Mqtt of string * DateTime
    |NotifyDbReady
    |ExitCode

type Instruction<'a> =
    |ReceiveMessage of (AgentMessage -> 'a)
    |InsertReadings of SensorReading [] * 'a
    |PostMessage of AgentMessage * 'a

type Program<'a> =
    |Free of Instruction<Program<'a>>
    |Pure of 'a

let mapI (f:'a -> 'b) : Instruction<'a> -> Instruction<'b> = function
    |ReceiveMessage next -> ReceiveMessage (next >> f)
    |InsertReadings (x, next) -> InsertReadings (x, next |> f)
    |PostMessage (x, next) -> PostMessage (x, next |> f)

let rec bind f = function
    |Free instruction -> instruction |> mapI (bind f) |> Free
    |Pure x -> f x

let map f = bind (f >> Pure)

let receiveMessage = Pure |> ReceiveMessage |> Free
let insertReadings lst = (lst, Pure ()) |> InsertReadings |> Free
let postMessage msg = (msg, Pure ()) |> PostMessage |> Free

type programBuilder() =
    member this.Bind(x, f) = bind f x
    member this.Return x = Pure x
    member this.ReturnFrom x = x
    member this.Zero() = Pure ()

let program = programBuilder()

let getSensorReading (s:string, dt:DateTime) =
    match s.Split("@") with
    |ar when ar.Length = 2 ->
        ar.[1].Split("|")
        |> Array.choose (fun r ->
            match r.Split(":") with
            |sar when sar.Length = 2 ->
                {
                    SensorName = ar.[0]
                    SensorDate = dt.ToString("yyyy-MM-dd HH:mm:ss")
                    ReadingName = sar.[0]
                    ReadingValue = sar.[1]
                } |> Some
            |_ -> None
        ) |> Some
    |_ -> None

let getSensorReadings lst = lst |> List.toArray |> Array.choose getSensorReading |> Array.collect id

let rec processMessage oldState = program {
    let! msg = receiveMessage
    match msg with
    |Mqtt (s, dt) when oldState.DbReady ->
        do! (s, dt) :: oldState.MqttList |> getSensorReadings |> insertReadings
        return! {oldState with MqttList = []; DbReady = false} |> processMessage
    |Mqtt (s, dt) ->
        return! {oldState with MqttList = (s, dt) :: oldState.MqttList} |> processMessage
    |NotifyDbReady when oldState.MqttList.Length > 0 ->
        do! oldState.MqttList |> getSensorReadings |> insertReadings
        return! {oldState with MqttList = []; DbReady = false} |> processMessage
    |NotifyDbReady ->
        return! {oldState with DbReady = true} |> processMessage
    |ExitCode -> return ()
}

let startAgent insertToDb = 
    MailboxProcessor.Start (fun inbox ->
    let rec interpret = function
        |Pure x -> x
        |Free (ReceiveMessage next) ->
            inbox.Receive() |> Async.RunSynchronously |> next |> interpret
        |Free (InsertReadings (lst, next)) ->
            lst |> insertToDb (fun () -> inbox.Post NotifyDbReady)
            next |> interpret
        |Free (PostMessage (msg,next)) ->
            inbox.Post msg
            next |> interpret

    async{{MqttList = []; DbReady = true} |> processMessage |> interpret}
)
let printReadingsToConsole notify readings =
    async{
        readings |> Array.map (fun r ->
            sprintf "('%s', '%s', '%s', %s)" r.SensorName r.SensorDate r.ReadingName r.ReadingValue
        ) |> String.concat "\n" |> printfn "%s"
        do! Async.Sleep 5000
        notify()
    } |> Async.Start

[<EntryPoint>]
let main argv =

    let agent = startAgent printReadingsToConsole

    async{
        let r = new Random()
        let getRandomMessage() =
            let sensorName = r.Next() |> sprintf "Sensor%i"
            let r1 = r.Next() |> sprintf "Reading%i"
            let r2 = r.Next() |> sprintf "Reading%i"
            let v1 = r.NextDouble()
            let v2 = r.NextDouble()
            sprintf "%s@%s:%f|%s:%f" sensorName r1 v1 r2 v2

        let rec sendTestMessages () = async{
            printfn "Sending Test Message"
            (getRandomMessage(), DateTime.Now) |> Mqtt |> agent.Post
            do! Async.Sleep 500
            do! sendTestMessages()
        }
        do! sendTestMessages()
    } |> Async.Start

    Console.ReadKey() |> ignore
    agent.Post ExitCode

    0 // return an integer exit code

Идея состоит в том, что процессор почтового ящика становится интерпретатором.Вся логика, которая происходит при получении сообщения, определяется как AST, и интерпретатор выбирает, когда сделать ответ на сообщение асинхронным.край.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...