Бесконечный поток действенных действий - PullRequest
4 голосов
/ 05 апреля 2019

Я бы хотел разобрать бесконечный поток байтов в бесконечный поток данных на Haskell.Каждый байт считывается из сети, поэтому они заключаются в монаду ввода-вывода.

Конкретнее, у меня есть бесконечный поток типа [IO(ByteString)].С другой стороны, у меня есть чистая функция синтаксического анализа parse :: [ByteString] -> [Object] (где Object - это тип данных Haskell)

Есть ли способ подключить мой бесконечный поток монады к моей функции синтаксического анализа?

Например, можно ли написать функцию типа [IO(ByteString)] -> IO [ByteString], чтобы я мог использовать свою функцию parse в монаде?

1 Ответ

8 голосов
/ 06 апреля 2019

Проблема

Вообще говоря, для того, чтобы действия ввода-вывода были правильно упорядочены и вели себя предсказуемо, каждое действие должно быть полностью выполнено перед выполнением следующего действия.В do-блоке это означает, что это работает:

main = do
    sequence (map putStrLn ["This","action","will","complete"])
    putStrLn "before we get here"

, но, к сожалению, это не сработает, если важно было выполнить окончательное действие ввода-вывода:

dontRunMe = do
    putStrLn "This is a problem when an action is"
    sequence (repeat (putStrLn "infinite"))
    putStrLn "<not printed>"

Так что, даже еслиsequence может быть специализировано для правильной сигнатуры типа:

sequence :: [IO a] -> IO [a]

она не работает должным образом в бесконечном списке действий ввода-вывода.У вас не будет проблем , определяя такую ​​последовательность:

badSeq :: IO [Char]
badSeq = sequence (repeat (return '+'))

, но любая попытка выполнить действие ввода-вывода (например, при попытке напечатать заголовок результирующего списка) будет зависать:

main = (head <$> badSeq) >>= print

Неважно, нужна ли вам только часть результата.Вы не получите ничего из монады ввода-вывода, пока не будет завершен весь sequence (поэтому «никогда», если список бесконечен).

Решение «Ленивый ввод-вывод»

Есливы хотите получить данные от частично выполненного действия ввода-вывода, вам нужно четко об этом сказать и использовать страшно звучащий аварийный люк Haskell, unsafeInterleaveIO.Эта функция выполняет действие ввода-вывода и «откладывает» его так, чтобы оно фактически не выполнялось до тех пор, пока не будет запрошено значение.

Причина, по которой это небезопасно в общем, заключается в том, что действие ввода-вывода, которое имеет смысл сейчас, может означатьчто-то другое, если на самом деле выполняется в более позднее времяВ качестве простого примера, действие ввода-вывода, которое усекает / удаляет файл, имеет совсем другой эффект, если оно выполняется до против после записи обновленного содержимого файла!

В любом случаето, что вы хотели бы сделать здесь, это написать ленивую версию sequence:

import System.IO.Unsafe (unsafeInterleaveIO)

lazySequence :: [IO a] -> IO [a]
lazySequence [] = return []  -- oops, not infinite after all
lazySequence (m:ms) = do
  x <- m
  xs <- unsafeInterleaveIO (lazySequence ms)
  return (x:xs)

Ключевым моментом здесь является то, что при выполнении действия lazySequence infstream оно на самом деле выполнит только первое действие;оставшиеся действия будут заключены в отложенное действие ввода-вывода, которое не будет действительно выполняться до тех пор, пока не будут запрошены второй и последующие элементы возвращаемого списка.

Это работает для поддельных действий ввода-вывода:

> take 5 <$> lazySequence (repeat (return ('+'))
"+++++"
>

(где, если вы замените lazySequence на sequence, он зависнет).Он также работает для реальных операций ввода-вывода:

> lns <- lazySequence (repeat getLine)
<waits for first line of input, then returns to prompt>
> print (head lns)
<prints whatever you entered>
> length (head (tail lns))  -- force next element
<waits for second line of input>
<then shows length of your second line before prompt>
>

В любом случае, с этим определением lazySequence и типами:

parse :: [ByteString] -> [Object]
input :: [IO ByteString]

у вас не должно возникнуть проблем с написанием:

outputs :: IO [Object]
outputs = parse <$> lazySequence inputs

, а затем использовать его лениво, как вы хотите:

main = do
    objs <- outputs
    mapM_ doSomethingWithObj objs

Использование Conduit

Несмотря на то, что вышеописанный механизм ленивого ввода-вывода довольно прост и понятен, ленивый ввод-вывод потерял свою популярностьдля производственного кода из-за проблем с управлением ресурсами, хрупкости в отношении утечек места (когда небольшое изменение в вашем коде увеличивает объем памяти) и проблем с обработкой исключений.

Одним из решений является conduitбиблиотека.Другой это pipes.Обе они являются тщательно разработанными потоковыми библиотеками, которые могут поддерживать бесконечные потоки.

Для conduit, если у вас была функция разбора, которая создала один объект на строку байтов, например:

parse1 :: ByteString -> Object
parse1 = ...

, а затем:

inputs :: [IO ByteString]
inputs = ...

useObject :: Object -> IO ()
useObject = ...

канал будет выглядеть примерно так:

import Conduit

main :: IO ()
main = runConduit $  mapM_ yieldM inputs
                  .| mapC parse1
                  .| mapM_C useObject

Учитывая, что ваша функция разбора имеет сигнатуру:

parse :: [ByteString] -> [Object]

Я почти уверен, что вы можетене интегрируйте это непосредственно с кабелепроводом (или, по крайней мере, ни в коем случае так, чтобы это не отбросило бы все преимущества использования кабелепровода).Вам нужно было бы переписать его, чтобы он был удобен для использования в канале, как он потребляет байтовые строки и созданные объекты.

...