Если мы хотим разрешить Stream
s, которые живут в ResourceT
, мы можем обойтись без функций из streaming-wai (которые работают только для Stream
s на основе IO
) и вместо этого построить поверх функций, таких как responseStream
из network-wai :
import Control.Monad.Trans.Resource
import Network.Wai
import Streaming
import qualified Streaming.Prelude as S
import Data.ByteString.Builder (byteString, Builder)
streamingResponseR :: Stream (Of ByteString) (ResourceT IO) r
-> Status
-> ResponseHeaders
-> Response
streamingResponseR stream status headers =
responseStream status headers streamingBody
where
streamingBody writeBuilder flush =
let writer a =
do liftIO (writeBuilder (byteString a))
-- flushes for every produced bytestring, perhaps not optimal
liftIO flush
in runResourceT $ void $ S.effects $ S.for stream writer
streamingBody
имеет тип StreamingBody
, который фактически является синонимом типа для функции (Builder -> IO ()) -> IO () -> IO ()
, которая принимает обратный вызов записи и обратный вызов сброса в качестве параметров и использует их для записи ответа, используя некоторый источник данных, который находится в области. (Обратите внимание, что эти обратные вызовы предоставляются WAI , а не пользователем.)
В нашем случае источником данных является Stream
, который живет в ResourceT
. Нам нужно поднять обратные вызовы write и flush (которые живут в IO
), используя liftIO
, также не забывайте вызывать runResourceT
, чтобы возвратить простое действие IO
в конце.
Что, если мы захотим сбросить ответ только после того, как накопленная длина испущенных тестовых строк достигнет некоторого предела?
Нам потребуется функция (не реализованная здесь) для создания деления при каждом достижении лимита:
breaks' :: Monad m
=> Int
-> Stream (Of ByteString) m r
-> Stream (Stream (Of ByteString) m) m r
breaks' breakSize = undefined
И затем мы могли бы вставить действие очистки между каждой группой, используя intercalates
, перед записью потока:
streamingBodyFrom :: Stream (Of ByteString) (ResourceT IO) ()
-> Int
-> StreamingBody
streamingBodyFrom stream breakSize writeBuilder flush =
let writer a = liftIO (writeBuilder (byteString a))
flusher = liftIO flush
broken = breaks' breakSize stream
in runResourceT . S.mapM_ writer . S.intercalates flusher $ broken