Продолжая мое повторное обнаружение бесплатных монад, я решил преобразовать небольшую программу, которую я пишу, в свободную монаду.
Программа собирает сообщения, полученные через MQTT с некоторого эфира.датчики у меня есть, и записывает результаты в базу данных.Я еще не реализовал какую-либо обработку ошибок, которая придет позже.
open System
open uPLibrary.Networking.M2Mqtt
open uPLibrary.Networking.M2Mqtt.Messages
open MySql.Data.MySqlClient
let connString = "server=...."
type agentMessage =
|Mqtt of string * DateTime
|ExitCode
|WaitForExit of AsyncReplyChannel<unit>
|NotifyDbReady
type agentState =
{
MqttList : (string * DateTime) list
ReplyOpt : AsyncReplyChannel<unit> option
DbReady : bool
}
type SensorReading =
{
SensorName:string
SensorDate:string
ReadingName:string
ReadingValue:string
}
let getSensorReading (s:string, dt:DateTime) =
match s.Split("@") with
|ar when ar.Length = 2 ->
ar.[1].Split("|")
|> Array.choose (fun r ->
match r.Split(":") with
|sar when sar.Length = 2 ->
{
SensorName = ar.[0]
SensorDate = dt.ToString("yyyy-MM-dd HH:mm:ss")
ReadingName = sar.[0]
ReadingValue = sar.[1]
} |> Some
|sar ->
printfn "Reading string [%s] has %i values" r sar.Length
None
) |> Some
|ar ->
printfn "Message [%s] has %i values" s ar.Length
None
let getSensorReadings lst = lst |> List.toArray |> Array.choose getSensorReading |> Array.collect id
let getLoadReadingsSql readings =
readings |> Array.map (fun r ->
sprintf "('%s', '%s', '%s', %s)" r.SensorName r.SensorDate r.ReadingName r.ReadingValue
) |> String.concat ", "
|> sprintf "insert into sensor_reading_staging(sensor_name, sensor_date, reading_name, reading_value) values %s"
|> (fun sql ->
MySqlHelper.ExecuteNonQuery(connString, sql) |> ignore
MySqlHelper.ExecuteNonQuery(connString, "call process_sensor_reading_staging()") |> ignore
)
let agent = MailboxProcessor.Start (fun inbox ->
let processSensorReadingList lst =
getSensorReadings lst
|> getLoadReadingsSql
inbox.Post NotifyDbReady
let rec messageLoop oldState = async{
let! msg = inbox.Receive()
let newState =
match msg with
|Mqtt (s, dt) when oldState.DbReady ->
async{(s, dt) :: oldState.MqttList |> processSensorReadingList } |> Async.Start
{oldState with MqttList = []; DbReady = false}
|Mqtt (s, dt) ->
{oldState with MqttList = (s, dt) :: oldState.MqttList}
|NotifyDbReady when oldState.MqttList.Length > 0 ->
async{oldState.MqttList |> processSensorReadingList } |> Async.Start
{oldState with MqttList = []; DbReady = false}
|NotifyDbReady ->
{oldState with DbReady = true}
|WaitForExit rep ->
{oldState with ReplyOpt = Some rep}
|ExitCode ->
oldState.ReplyOpt |> Option.map (fun rep -> rep.Reply()) |> ignore
{oldState with ReplyOpt = None}
return! messageLoop newState
}
messageLoop {MqttList = []; ReplyOpt = None; DbReady = true}
)
[<EntryPoint>]
let main argv =
let client = new MqttClient(argv.[0])
printfn "Connecting to broker"
client.Connect(Guid.NewGuid().ToString()) |> ignore
client.Subscribe([|"AirSensorReadings"|], [|MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE|]) |> ignore
use __ =
client.MqttMsgPublishReceived
|> Observable.subscribe(fun evArgs -> (evArgs.Message |> Array.map char |> String.Concat, DateTime.Now) |> Mqtt |> agent.Post)
agent.PostAndReply (fun rep -> WaitForExit rep)
//the agent never actually receives the exit message
0 // return an integer exit code
Я слежу за отличным постом в блоге Марка Симанна за это.
Так что, думаю, янужно взять каждую функцию с возвращаемым типом единицы измерения и передать ее интерпретатору.
Кажется, что это
- Получение сообщения от MQTT
- Запись вDB
- Пост агенту
Первые 2 кажутся достаточно простыми.Последнее заставляет меня немного почесать голову.AST, полученный в результате этого упражнения, по сути представляет собой последовательный список инструкций.В этом нет ничего параллельного.Но сообщения, которые отправляются агенту, могут появиться в любое время.Так как мне объединить эти 2 понятия?Может ли быть так, что каждое полученное сообщение генерирует новый AST, который в итоге приводит к чистой единице?Другими словами, только пункт 2 в списке выше всегда моделируется как инструкция.Или у меня были бы совершенно отдельные AST для агента и для работы, которая происходит при записи в БД?