Потоковая передача строковых данных с помощью F # Suave - PullRequest
0 голосов
/ 17 мая 2018

С Suave 2.4.0, поддерживающим TransferEncoding.chunked и HttpOutput.writeChunk Я написал приведенный ниже код для потоковой передачи данных по HTTP.

let sendStrings getStringsFromProducer : WebPart =
    Writers.setStatus HTTP_200 >=>
    TransferEncoding.chunked (fun conn -> socket {
        let refConn = ref conn

        for str in getStringsFromProducer do
            let! (_, conn) = (str |> stringToBytes |> HttpOutput.writeChunk) !refConn
            refConn := conn

        return! HttpOutput.writeChunk [||] !refConn
    }
)

Пока это работает, я подвергаю сомнению надежность использования ref и надеясь, что есть лучший способ сделать то же самое более функциональным способом.Есть ли лучший способ сделать это?Если я не могу изменить getStringsFromProducer?

Ответы [ 2 ]

0 голосов
/ 17 мая 2018

Я искал способ заменить ref / mutables в F # обычным способом, и, хотя я нашел решение, в вашем случае это может быть излишним. Похоже, что ссылка является локальной, которая обновляется только из одного потока, поэтому она, вероятно, довольно безопасна. Однако, если вы хотите заменить его, вот как я решил проблему:

type private StateMessage<'a> =
| Get of AsyncReplyChannel<'a>
| GetOrSet of 'a * AsyncReplyChannel<'a>
| GetOrSetResult of (unit -> 'a) * AsyncReplyChannel<'a>
| Set of 'a
| Update of ('a -> 'a) * AsyncReplyChannel<'a>

type Stateful<'a>(?initialValue: 'a) =
    let agent = MailboxProcessor<StateMessage<'a>>.Start
                <| fun inbox ->
                    let rec loop state =
                        async {
                            let! message = inbox.Receive()
                            match message with
                            | Get channel -> 
                                match state with
                                | Some value -> channel.Reply(value)
                                | None -> channel.Reply(Unchecked.defaultof<'a>)
                                return! loop state
                            | GetOrSet (newValue, channel) ->
                                match state with
                                | Some value ->
                                    channel.Reply(value)
                                    return! loop state
                                | None ->
                                    channel.Reply(newValue)
                                    return! loop (Some newValue)
                            | GetOrSetResult (getValue, channel) ->
                                match state with
                                | Some value ->
                                    channel.Reply(value)
                                    return! loop state
                                | None ->
                                    let newValue = getValue ()
                                    channel.Reply(newValue)
                                    return! loop (Some newValue)
                            | Set value -> 
                                return! loop (Some value)
                            | Update (update, channel) ->
                                let currentValue =
                                    match state with
                                    | Some value -> value
                                    | None -> Unchecked.defaultof<'a>
                                let newValue = update currentValue
                                channel.Reply(newValue)
                                return! loop (Some newValue)
                        }
                    loop initialValue

    let get () = agent.PostAndReply Get
    let asyncGet () = agent.PostAndAsyncReply Get
    let getOrSet value = agent.PostAndReply <| fun reply -> GetOrSet (value, reply)
    let asyncGetOrSet value = agent.PostAndAsyncReply <| fun reply -> GetOrSet (value, reply)
    let getOrSetResult getValue = agent.PostAndReply <| fun reply -> GetOrSetResult (getValue, reply)
    let asyncGetOrSetResult getValue = agent.PostAndAsyncReply <| fun reply -> GetOrSetResult (getValue, reply)
    let set value = agent.Post <| Set value
    let update f = agent.PostAndReply <| fun reply -> Update (f, reply)
    let asyncUpdate f = agent.PostAndAsyncReply <| fun reply -> Update (f, reply)

    member __.Get () = get ()
    member __.AsyncGet () = asyncGet ()
    member __.GetOrSet value = getOrSet value
    member __.AsyncGetOrSet value = asyncGetOrSet value
    member __.GetOrSetResult getValue = getOrSetResult getValue
    member __.AsyncGetOrSetResult getValue = asyncGetOrSetResult getValue
    member __.Set value = set value
    member __.Update f = update f
    member __.AsyncUpdate f = asyncUpdate f

Это в основном использует MailboxProcessor для сериализации обновлений в состояние, управляемое хвостовой рекурсивной функцией, аналогично второму примеру Томаса. Тем не менее, это позволяет вам вызывать Get / Set / Update способом, который больше похож на традиционное изменяемое состояние, даже при том, что он фактически не выполняет мутацию. Вы можете использовать это так:

let state = Stateful(0)
state.Get() |> printfn "%d"
state.Set(1)
state.Get() |> printfn "%d"
state.Update(fun x -> x + 1) |> printfn "%d"

Будет напечатано:

0
1 
2
0 голосов
/ 17 мая 2018

Я думаю, что вы не можете избежать всех мутаций в этом случае - написание кусков один за другим является довольно императивной операцией, а итерация по ленивой последовательности также требует (изменяемого) итератора, поэтому нет способа избежать всех мутаций.Я думаю, что ваша sendStrings функция отлично справляется с сокрытием мутации от потребителя и предоставляет хороший функциональный API.

Вы можете избежать использования ref ячеек и заменить их локальной изменяемой переменной, которая являетсянемного безопаснее - потому что изменяемая переменная не может выходить за пределы локальной области видимости:

TransferEncoding.chunked (fun conn -> socket {
    let mutable conn = conn
    for str in getStringsFromProducer do
        let! _, newConn = HttpOutput.writeChunk (stringToBytes str) conn
        conn <- newConn
    return! HttpOutput.writeChunk [||] conn
}

Вы можете избежать изменяемой переменной conn, используя рекурсию, но для этого нужно работать с IEnumerator<'T>, а не с хорошимfor цикл для итерации по последовательности, так что я думаю, что это на самом деле менее приятно, чем версия с изменяемой переменной:

TransferEncoding.chunked (fun conn -> socket {
    let en = getStringsFromProducer.GetEnumerator()
    let rec loop conn = socket {
      if en.MoveNext() then 
        let! _, conn = HttpOutput.writeChunk (stringToBytes en.Current) conn
        return! loop conn }
    do! loop conn
    return! HttpOutput.writeChunk [||] conn }) 
...