Ограничение использования памяти при чтении файлов - PullRequest
5 голосов
/ 19 сентября 2010

Я новичок в Haskell и подумал, что это будет хорошим упражнением.У меня есть задание, в котором мне нужно прочитать файл в потоке A, обработать строки файла в потоках B_i, а затем вывести результаты в потоке C.

Я уже реализовал это, но одно из требованийв том, что мы не можем доверять тому, что весь файл помещается в память.Я надеялся, что ленивый ввод-вывод и сборщик мусора сделают это для меня, но, увы, использование памяти продолжает расти и расти.

Поток чтения (A) читает файл с readFile, который затем упаковывается в строкуномера и завернутые в Just.Эти сжатые строки затем записываются в Control.Concurrent.Chan.Каждый потребительский поток B имеет свой собственный канал.

Каждый потребитель читает свой собственный канал, когда у него есть данные, и, если регулярное выражение совпадает, он выводится на свой собственный соответствующий выходной канал, заключенный в Maybe (составленный из списков).

Принтер проверяет выходной канал каждого из потоков B.Если ни один из результатов (строка) не равен Nothing, строка печатается.Поскольку на этом этапе не должно быть ссылок на более старые строки, я подумал, что сборщик мусора сможет освободить эти строки, но, увы, я, похоже, ошибаюсь.

Файл .lhsздесь: http://gitorious.org/hajautettujen-sovellusten-muodostamistekniikat/hajautettujen-sovellusten-muodostamistekniikat/blobs/master/mgrep.lhs

Итак, вопрос в том, как ограничить использование памяти или разрешить сборщику мусора удалить строки.

Фрагменты в соответствии с запросом.Надеюсь, отступ не слишком сильно разрушен:)

data Global = Global {done :: MVar Bool, consumers :: Consumers}
type Done = Bool
type Linenum = Int
type Line = (Linenum, Maybe String)
type Output = MVar [Line]
type Input = Chan Line
type Consumers = MVar (M.Map ThreadId (Done, (Input, Output)))
type State a = ReaderT Global IO a


producer :: [Input] -> FilePath -> State ()
producer c p = do
  liftIO $ Main.log "Starting producer"
  d <- asks done
  f <- liftIO $ readFile p
  mapM_ (\l -> mapM_
    (liftIO . flip writeChan l) c)
    $ zip [1..] $ map Just $ lines f
  liftIO $ modifyMVar_ d (return . not)

printer :: State ()
printer = do
  liftIO $ Main.log "Starting printer"
  c <- (fmap (map (snd . snd) . M.elems)
    (asks consumers >>= liftIO . readMVar))
  uniq' c
  where head' :: Output -> IO Line
    head' ch = fmap head (readMVar ch)

    tail' = mapM_ (liftIO . flip modifyMVar_
        (return . tail))

    cont ch = tail' ch >> uniq' ch

    printMsg ch = readMVar (head ch) >>=
        liftIO . putStrLn . fromJust . snd . head

    cempty :: [Output] -> IO Bool
    cempty ch = fmap (any id)
        (mapM (fmap ((==) 0 . length) . readMVar ) ch)

    {- Return false unless none are Nothing -}
    uniq :: [Output] -> IO Bool
    uniq ch = fmap (any id . map (isNothing . snd))
        (mapM (liftIO . head') ch)

    uniq' :: [Output] -> State ()
    uniq' ch = do
      d <- consumersDone
      e <- liftIO $ cempty ch
      if not e
        then  do
          u <- liftIO $ uniq ch
          if u then cont ch else do
        liftIO $ printMsg ch
        cont ch
          else unless d $ uniq' ch

1 Ответ

6 голосов
/ 19 сентября 2010

Параллельное программирование не предлагает определенного порядка выполнения, если только вы сами его не используете с помощью mvars и т.п. Поэтому вполне вероятно, что поток производителя вставляет все / большинство строк в канале, прежде чем какой-либо потребитель прочитает их и передаст их. Другая архитектура, которая должна соответствовать требованиям, - это просто вызвать поток A, вызвать ленивый файл чтения и вставить результат в mvar. Затем каждый потребительский поток принимает mvar, читает строку, а затем заменяет mvar, прежде чем приступить к обработке строки. Даже в этом случае, если выходной поток не может идти в ногу, количество совпадающих строк, сохраненных в канале, может произвольно возрасти.

То, что у вас есть, это push-архитектура. Чтобы действительно заставить его работать в постоянном пространстве, думайте с точки зрения спроса. Найдите такой механизм, чтобы выходной поток сигнализировал потокам обработки, что они должны что-то сделать, и чтобы потоки обработки сообщали потоку считывателя, что они должны что-то делать.

Еще один способ сделать это - вместо этого иметь чаны ограниченного размера - чтобы поток считывателя блокировался, когда потоки процессора не догнали, и поэтому потоки процессора блокировались, когда поток вывода не догонял.

В целом, проблема на самом деле напоминает мне о широком тесте Тима Брея, хотя требования несколько иные. В любом случае это привело к широкому обсуждению наилучшего способа реализации многоядерного grep. Большая проблема заключалась в том, что проблема связана с вводом-выводом, и вы хотите, чтобы несколько потоков считывателей передавались по mmapped файлам.

Смотрите здесь больше, чем вы когда-либо захотите узнать: http://www.tbray.org/ongoing/When/200x/2007/09/20/Wide-Finder

...