Как объединить входные данные «один к одному» и «один ко многим»: выходные отношения в канале? - PullRequest
0 голосов
/ 24 января 2019

Я уже некоторое время борюсь с этой проблемой, хотя, если честно, я многое узнал о Conduit, поскольку ранее я в основном использовал консервированные примеры с несколькими исключениями.

основная проблема сформулирована следующим образом для каналов A, B и C;A .| B (A подается в B) и A .| C, и, наконец, мне нужно иметь функцию, которая принимает B и C и производит промежуточный канал, вызывает его Merge B C, чтобы я мог сделать (Merge B C) .| D.Мой опыт работы с языками, не относящимися к Haskell, с FRP / потоковыми библиотеками позволяет предположить, что существует несколько различных способов выполнить «слияние» (например, семейство операций zip «sample on» - генерировать только новые элементы для D, когда один или нескольковыбранные входные каналы имеют новое значение и т. д.).Я думаю, что моя проблема в том, чтобы понять, как это сделать в Conduit, если это поддерживается.

Еще более конкретно для моей конкретной проблемы сегодня, B имеет отношение 1: 1 с A, тогда как C имеет отношение много: 1 с A и, в конечном итоге, DЯ хочу, чтобы повторяющиеся элементы B сочетались с соответствующими элементами C: если a~b и a~c для a в A, b в B и c в C, затем (b,c) подается в D.Так что я смог использовать ZipSink и тот факт, что это действительно разумное место для снижения производительности (помимо производительности, на которую я не смотрел).Конечно, как и ожидалось, getZipSink ничего не знает об отношениях один-ко-многим и о том, как их решать;он имеет широко определенное поведение zip, чтобы просто циклически проходить через входные потоки, пока все входные потоки не будут циклически пройдены один раз.

Я полагаю, что один из способов сделать это может состоять в том, чтобы каким-то образом изменить мой поток «один ко многим» впоток один-к-одному, делая сгиб в нечто вроде списка.Но тогда я должен был бы распаковать это позже вне контекста канала.На данный момент, я просто хочу спросить, каков рекомендуемый способ (ы).

Мой фактический код выглядит так (A это sourceDirectoryDeep, B это processFileName, C is processCSV, а D is (вроде бы, я полагаю) getZipSink):

retrieveSmaXtec :: Path Abs Dir -> IO (Vector SxRecord)
retrieveSmaXtec sxDir = do
  rows <- sourceDirectoryDeep False (fromAbsDir sxDir)
    .| getZipSink (combine <$> ZipSink processFileName <*> ZipSink processCSV )
    & runConduitRes
  print rows
  rows & fmap fromRow & catMaybes & return
  where
    combine :: (Vector (MapRow Text)) -> (Vector (MapRow Text)) -> (Vector (MapRow Text))
    combine v1 v2 = (uncurry DM.union) <$> (zip v1 v2)
    processCSV :: (MonadResource m, MonadThrow m, PrimMonad m)=>
      ConduitT FilePath Void m (Vector ((MapRow Text)))
    processCSV = mapMC (liftIO . DTIO.readFile)
      .| intoCSV defCSVSettings
      .| sinkVector
    processFileName :: (MonadResource m, MonadThrow m, PrimMonad m) =>
      ConduitT FilePath Void m (Vector ((MapRow Text)))
    processFileName = mapC go
      .| sinkVector
      where
        go :: FilePath -> MapRow Text
        go fp = takeFileName fp
          & takeWhile (/= '.')
          & splitOn "_"
          & fmap Txt.pack
          & zip colNames
          & DM.fromList
        colNames = [markKey, idKey]

Импорт (некоторые из которых могут быть посторонними):

import           Conduit
import qualified Data.Conduit.Combinators       as DCC
import           Data.CSV.Conduit
import           Data.Function                  ((&))
import           Data.List.Split                (splitOn)
import           Data.Map                       as DM
import           Data.Text                      (Text)
import qualified Data.Text                      as Txt
import qualified Data.Text.IO                   as DTIO
import           Data.Vector                    (Vector)
import           Path
import           System.FilePath.Posix
...