Я новичок в Haskell и подумал, что это будет хорошим упражнением.У меня есть задание, в котором мне нужно прочитать файл в потоке A, обработать строки файла в потоках B_i, а затем вывести результаты в потоке C.
Я уже реализовал это, но одно из требованийв том, что мы не можем доверять тому, что весь файл помещается в память.Я надеялся, что ленивый ввод-вывод и сборщик мусора сделают это для меня, но, увы, использование памяти продолжает расти и расти.
Поток чтения (A) читает файл с readFile
, который затем упаковывается в строкуномера и завернутые в Just.Эти сжатые строки затем записываются в Control.Concurrent.Chan
.Каждый потребительский поток B имеет свой собственный канал.
Каждый потребитель читает свой собственный канал, когда у него есть данные, и, если регулярное выражение совпадает, он выводится на свой собственный соответствующий выходной канал, заключенный в Maybe (составленный из списков).
Принтер проверяет выходной канал каждого из потоков B.Если ни один из результатов (строка) не равен Nothing, строка печатается.Поскольку на этом этапе не должно быть ссылок на более старые строки, я подумал, что сборщик мусора сможет освободить эти строки, но, увы, я, похоже, ошибаюсь.
Файл .lhsздесь: http://gitorious.org/hajautettujen-sovellusten-muodostamistekniikat/hajautettujen-sovellusten-muodostamistekniikat/blobs/master/mgrep.lhs
Итак, вопрос в том, как ограничить использование памяти или разрешить сборщику мусора удалить строки.
Фрагменты в соответствии с запросом.Надеюсь, отступ не слишком сильно разрушен:)
data Global = Global {done :: MVar Bool, consumers :: Consumers}
type Done = Bool
type Linenum = Int
type Line = (Linenum, Maybe String)
type Output = MVar [Line]
type Input = Chan Line
type Consumers = MVar (M.Map ThreadId (Done, (Input, Output)))
type State a = ReaderT Global IO a
producer :: [Input] -> FilePath -> State ()
producer c p = do
liftIO $ Main.log "Starting producer"
d <- asks done
f <- liftIO $ readFile p
mapM_ (\l -> mapM_
(liftIO . flip writeChan l) c)
$ zip [1..] $ map Just $ lines f
liftIO $ modifyMVar_ d (return . not)
printer :: State ()
printer = do
liftIO $ Main.log "Starting printer"
c <- (fmap (map (snd . snd) . M.elems)
(asks consumers >>= liftIO . readMVar))
uniq' c
where head' :: Output -> IO Line
head' ch = fmap head (readMVar ch)
tail' = mapM_ (liftIO . flip modifyMVar_
(return . tail))
cont ch = tail' ch >> uniq' ch
printMsg ch = readMVar (head ch) >>=
liftIO . putStrLn . fromJust . snd . head
cempty :: [Output] -> IO Bool
cempty ch = fmap (any id)
(mapM (fmap ((==) 0 . length) . readMVar ) ch)
{- Return false unless none are Nothing -}
uniq :: [Output] -> IO Bool
uniq ch = fmap (any id . map (isNothing . snd))
(mapM (liftIO . head') ch)
uniq' :: [Output] -> State ()
uniq' ch = do
d <- consumersDone
e <- liftIO $ cempty ch
if not e
then do
u <- liftIO $ uniq ch
if u then cont ch else do
liftIO $ printMsg ch
cont ch
else unless d $ uniq' ch