Сеть разделяет поток байтов на произвольные ByteString
фрагменты.С помощью кода выше, эти ByteString
чанки будут сопоставлены с чанами Text
, а каждый чанк Text
будет проанализирован как decimal
.Однако строка десятичных цифр, представляющих один decimal
, может быть разбита на два (или более) Text
фрагмента.Кроме того, как вы понимаете, использование decimal
возвращает вам остаток от Text
, который не был проанализирован как часть decimal
, который вы пытаетесь вернуть обратно во входной поток.
Обе эти проблемы можно решить с помощью Data.Conduit.Attoparsec. conduitParserEither
с Data.Attoparsec.Text.decimal
.Обратите внимание, что недостаточно просто разобрать decimal
;вам также потребуется обработать какой-либо разделитель между decimal
с.
Также невозможно слить Source
с CL.map
, поскольку сигнатура типа CL.map
имеет значение
map :: Monad m => (a -> b) -> Conduit a m b
Функция, которую вы передаете map
, получает возможность преобразовывать каждый вход a
в один b
, а не поток b
.Для этого вы можете использовать awaitForever
, но вам нужно преобразовать Source
в общее Producer
с toProducer
, чтобы типы соответствовали.
Однако в вашемкод, вы пытаетесь отправить ошибки синтаксического анализа ниже по потоку как ByteString
, но вывод mySource
как Int
, что является ошибкой типа.Вы должны предоставить поток ByteString
в обоих случаях;успешный случай синтаксического анализа может вернуть Conduit
, сделанный путем слияния других Conduit
, до тех пор, пока он не закончится с выводом ByteString
:
...
$= (let f (Left err) = yield $ S8.pack $ "Error: " ++ show err
f (Right (_, i)) = toProducer (mySource i) $= someOtherConduit
in awaitForever f)
, где someOtherConduit
погружает Int
из mySource
и источников ByteString
.
someOtherConduit :: Monad m => Conduit Int m ByteString
Наконец, я полагаю, вы хотели подключить snk
в конце канала вместо src
.