Ответ
Самым простым решением было бы использовать Client
и Server
, как предложено в комментариях к danidiaz, поскольку pipes
не имеет встроенной поддержки циклических каналов и будетбыть невероятно трудным, если не невозможным, сделать это правильно. Это в основном потому, что нам нужно обрабатывать случаи, когда число await
s не совпадает с числом yield
s.
Редактировать: Я добавил раздел о проблемахс другим ответом. См. Раздел «Другая проблемная альтернатива»
Редактировать 2: Ниже я добавил менее проблемное возможное решение. См. Раздел «Возможное решение»
Проблемная альтернатива
Однако его можно смоделировать с помощью структуры Proxy
(с Client
и Server
) и аккуратная функция generalize
, которая превращает однонаправленный Pipe
в двунаправленный Proxy
.
generalize f x0
+-----------+ +---------------------+
| | | |
| | x <======================== x
a ==> f ==> b becomes | |
| | a ==> f ==> b
| | | | |
+-----|-----+ +----------|----------+
v v
r r
Теперь мы можем использовать //>
и >\\
, чтобы заткнуть концы и сделать поток циклическим:
loop :: Monad m => Pipe a a m r -> a -> Effect m r
loop p x0 = pure >\\ generalize p x0 //> pure
, который имеет эту форму
loop f
a
+-----|-----+
| | |
/====<=======/===<========\
| | | |
\=> a ==> f ==> a ==/
| |
+-----|-----+
v
r
Как вы можетевидите, мы должны ввести начальное значение для a
. Это потому, что нет гарантии, что канал не будет await
, прежде чем он даст, что заставило бы его ждать вечно.
Обратите внимание, однако, что этот выбросит данные , еслипередаются yield
s несколько раз до await
ing, поскольку обобщение внутренне реализовано с помощью монады состояния, которая сохраняет последнее значение при выдаче и извлекает последнее значение при ожидании.
Использование (проблематичной идеи)
Чтобы использовать его с вашими каналами, просто скомпонуйте их и передайте loop
:
runEffect $ loop (f >-> g)
Но, пожалуйста, не используйте его, так как он случайным образом выбрасывает данные, если выне осторожны
Еще одна проблемная альтернатива
Вы также можете создать лениво бесконечную цепочку каналов, как предложил mingmingrr
infiniteChain :: Functor m => Pipe a a m r -> Producer a m r
infiniteChain f = infiniteChain >-> f
Это решает проблему отброшенных / дублированных значений,но есть несколько других проблем. Во-первых, ожидание первого перед сдачей вызовет бесконечный цикл с бесконечным использованием памяти, но это уже учтено в ответе mingmingr.
Другая, более сложная проблема, заключается в том, что каждое действие перед соответствующим выходом дублируется. один раз за каждого ждут. Мы можем увидеть это, если мы изменим их пример, чтобы регистрировать происходящее:
import Pipes
import qualified Pipes.Prelude as P
f :: Monad m => Pipe Int Int m r
f = P.map (* 2)
g :: Monad m => Int -> Pipe Int Int m ()
g 0 = return ()
g n = do
lift . putStrLn $ "Awaiting. n = " ++ show n
x <- await
lift . putStrLn $ "Got: x = " ++ show x ++ " and n = "++ show n ;
yield (x + 1)
g (n - 1)
cyclic' :: Monad m => Int -> Producer Int m Int
cyclic' input = let pipe = (yield input >> pipe) >-> f >-> g 6 in pipe
Теперь при запуске runEffect (cyclic' 0 >-> P.print)
будет напечатано следующее:
Awaiting. n = 6
Got: x = 0 and n = 6
1
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
7
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
15
Awaiting. n = 2
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
Got: x = 30 and n = 2
31
Awaiting. n = 1
Awaiting. n = 2
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
Got: x = 30 and n = 2
Got: x = 62 and n = 1
63
Как вы можете видеть, длякаждый await
, мы перезапускаем все до соответствующего yield
. Более конкретно, ожидание запускает новую копию канала, пока он не достигнет доходности. Когда мы снова ждем, копия будет работать до следующего выхода снова, и если она вызовет await
во время этого, она создаст еще одну копию и будет запускать ее до первого выхода, и т. Д.
Этоозначает, что в лучшем случае мы получаем O(n^2)
вместо линейной производительности (и используя O(n)
вместо O(1)
памяти), так как мы повторяем все для каждого действия. В худшем случае, например, если мы читаем или записываем файл, мы можем получить совершенно неверные результаты, поскольку мы повторяем побочные эффекты.
Возможное решение
Если вам действительно нужноиспользуйте Pipe
s и не можете использовать request
/ respond
вместо этого, и вы уверены, что ваш код никогда не будет await
больше (или раньше) его yield
s (или будет иметь хорошее значение по умолчанию, чтобы дать егов этих случаях), мы могли бы опираться на мою предыдущую попытку создать решение, которое по крайней мере обрабатывает случай, когда yield
больше, чем вы await
.
Хитрость заключается в добавлении буфера к реализацииgeneralize
, поэтому избыточные значения сохраняются, а не выбрасываются. Мы также можем сохранить дополнительный аргумент в качестве значения по умолчанию, когда буфер пуст.
import Pipes.Lift (evalStateP)
import Control.Monad.Trans.State.Strict (state, modify)
import qualified Data.Sequence
generalize' :: Monad m => Pipe a b m r -> x -> Proxy x a x b m r
generalize' p x0 = evalStateP Seq.empty $ up >\\ hoist lift p //> dn
where
up () = do
x <- lift $ state (takeHeadDef x0)
request x
dn a = do
x <- respond a
lift $ modify (Seq.|> x)
takeHeadDef :: a -> Seq.Seq a -> (a, Seq.Seq a)
takeHeadDef x0 xs = (foldr const x0 xs, Seq.drop 1 xs)
Если мы теперь подключим это к нашему определению loop
, мы решим проблему отбрасывания избыточных значений (за счет хранения памяти в буфере). Он также предотвращает дублирование любых значений, отличных от значения по умолчанию, и использует значение по умолчанию, только когда буфер пуст.
loop' :: Monad m => a -> Pipe a a m r -> Effect m r
loop' x0 p = pure >\\ generalize' p x0 //> pure
Если мы хотим, чтобы await
ing до yield
ing был ошибкой, мыможет просто дать error
в качестве значения по умолчанию: loop' (error "Await without yield") somePipe
.
TL; DR
Использовать Client
и Server
из Pipes.Core
. Это решит вашу проблему и не вызовет кучу странных ошибок.
Если это невозможно, то мой раздел «Возможное решение» с измененной версией generalize
в большинстве случаев должен выполнить эту работу.