Я уже некоторое время борюсь с этой проблемой, хотя, если честно, я многое узнал о Conduit, поскольку ранее я в основном использовал консервированные примеры с несколькими исключениями.
основная проблема сформулирована следующим образом для каналов A
, B
и C
;A .| B
(A
подается в B
) и A .| C
, и, наконец, мне нужно иметь функцию, которая принимает B и C и производит промежуточный канал, вызывает его Merge B C
, чтобы я мог сделать (Merge B C) .| D
.Мой опыт работы с языками, не относящимися к Haskell, с FRP / потоковыми библиотеками позволяет предположить, что существует несколько различных способов выполнить «слияние» (например, семейство операций zip «sample on» - генерировать только новые элементы для D
, когда один или нескольковыбранные входные каналы имеют новое значение и т. д.).Я думаю, что моя проблема в том, чтобы понять, как это сделать в Conduit, если это поддерживается.
Еще более конкретно для моей конкретной проблемы сегодня, B
имеет отношение 1: 1 с A
, тогда как C
имеет отношение много: 1 с A
и, в конечном итоге, D
Я хочу, чтобы повторяющиеся элементы B
сочетались с соответствующими элементами C
: если a~b
и a~c
для a
в A
, b
в B
и c
в C
, затем (b,c)
подается в D
.Так что я смог использовать ZipSink
и тот факт, что это действительно разумное место для снижения производительности (помимо производительности, на которую я не смотрел).Конечно, как и ожидалось, getZipSink
ничего не знает об отношениях один-ко-многим и о том, как их решать;он имеет широко определенное поведение zip, чтобы просто циклически проходить через входные потоки, пока все входные потоки не будут циклически пройдены один раз.
Я полагаю, что один из способов сделать это может состоять в том, чтобы каким-то образом изменить мой поток «один ко многим» впоток один-к-одному, делая сгиб в нечто вроде списка.Но тогда я должен был бы распаковать это позже вне контекста канала.На данный момент, я просто хочу спросить, каков рекомендуемый способ (ы).
Мой фактический код выглядит так (A
это sourceDirectoryDeep
, B
это processFileName
, C
is processCSV
, а D
is (вроде бы, я полагаю) getZipSink
):
retrieveSmaXtec :: Path Abs Dir -> IO (Vector SxRecord)
retrieveSmaXtec sxDir = do
rows <- sourceDirectoryDeep False (fromAbsDir sxDir)
.| getZipSink (combine <$> ZipSink processFileName <*> ZipSink processCSV )
& runConduitRes
print rows
rows & fmap fromRow & catMaybes & return
where
combine :: (Vector (MapRow Text)) -> (Vector (MapRow Text)) -> (Vector (MapRow Text))
combine v1 v2 = (uncurry DM.union) <$> (zip v1 v2)
processCSV :: (MonadResource m, MonadThrow m, PrimMonad m)=>
ConduitT FilePath Void m (Vector ((MapRow Text)))
processCSV = mapMC (liftIO . DTIO.readFile)
.| intoCSV defCSVSettings
.| sinkVector
processFileName :: (MonadResource m, MonadThrow m, PrimMonad m) =>
ConduitT FilePath Void m (Vector ((MapRow Text)))
processFileName = mapC go
.| sinkVector
where
go :: FilePath -> MapRow Text
go fp = takeFileName fp
& takeWhile (/= '.')
& splitOn "_"
& fmap Txt.pack
& zip colNames
& DM.fromList
colNames = [markKey, idKey]
Импорт (некоторые из которых могут быть посторонними):
import Conduit
import qualified Data.Conduit.Combinators as DCC
import Data.CSV.Conduit
import Data.Function ((&))
import Data.List.Split (splitOn)
import Data.Map as DM
import Data.Text (Text)
import qualified Data.Text as Txt
import qualified Data.Text.IO as DTIO
import Data.Vector (Vector)
import Path
import System.FilePath.Posix