Почему моя модифицированная (в реальном мире haskell) реализация Mapreduce завершается неудачно с «слишком много открытых файлов» - PullRequest
3 голосов
/ 04 апреля 2011

Я реализую программу на Haskell, которая сравнивает каждую строку файла с каждой другой строкой в ​​файле. Для простоты давайте предположим, что структура данных, представленная одной линией, является просто Int, а мой алгоритм - это квадрат расстояния. Это я бы реализовал следующим образом:

--My operation
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

combineDistances :: [Int] -> Int
combineDistances = sum

--Applying my operation simply on a file
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
              fileContents <- readFile path
              return $ allDistances $ map read $ lines $ fileContents
              where
                  allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
                  allDistances _ = 0

--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
    where infiniteList :: Int->Int-> [Int]
          infiniteList i j = (i + j) : infiniteList j (i+j)

К сожалению, полный файл будет храниться в памяти. Чтобы предотвратить возможные нехватки памяти в очень больших файлах, я хотел бы искать файловый курсор в начале файла при каждой рекурсии «allDistances».

В книге "Real World Haskell" дается реализация mapreduce с функцией разделения файла на куски (глава 24, доступно здесь ). Я изменил функцию чанкинга, чтобы вместо деления всего файла на чанки возвращать столько чанков, сколько строк, причем каждый чанк представляет один элемент

tails . lines. readFile

Полная реализация (плюс предыдущая область кода)

import qualified Data.ByteString.Lazy.Char8 as Lazy
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
import System.IO

--Applying my operation using mapreduce on a very big file
sumOfDistancesOnFile :: FilePath -> IO Int
sumOfDistancesOnFile path = chunkedFileOperation chunkByLinesTails (distancesUsingMapReduce) path

distancesUsingMapReduce :: [Lazy.ByteString] -> Int
distancesUsingMapReduce = mapReduce rpar (distancesFirstToTail . lexer)
                                rpar combineDistances
              where lexer :: Lazy.ByteString -> [Int]
                    lexer chunk = map (read . Lazy.unpack) (Lazy.lines chunk)

distancesOneToMany :: Int -> [Int] -> Int
distancesOneToMany one many = combineDistances $ map (distance one) many

distancesFirstToTail :: [Int] -> Int
distancesFirstToTail s = 
              if not (null s)
              then distancesOneToMany (head s) (tail s)
              else 0
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
      -> (a -> b)   -- map function
      -> Strategy c -- evaluation strategy for reduction
      -> ([b] -> c) -- reduce function
      -> [a]        -- list to map over
      -> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
      mapResult `pseq` reduceResult
      where mapResult    = parMap mapStrat mapFunc input
            reduceResult = reduceFunc mapResult `using` reduceStrat


--Working with (file)chunks:
data ChunkSpec = CS{
    chunkOffset :: !Int64
    , chunkLength :: !Int64
    } deriving (Eq,Show)

chunkedFileOperation ::   (NFData a)=>
            (FilePath-> IO [ChunkSpec])
       ->   ([Lazy.ByteString]-> a)
       ->   FilePath
       ->   IO a
chunkedFileOperation chunkCreator funcOnChunks path = do
    (chunks, handles)<- chunkedRead chunkCreator path
    let r = funcOnChunks chunks
    (rdeepseq r `seq` return r) `finally` mapM_ hClose handles

chunkedRead ::  (FilePath -> IO [ChunkSpec])
        ->  FilePath
        ->  IO ([Lazy.ByteString], [Handle])
chunkedRead chunkCreator path = do
    chunks <- chunkCreator path
    liftM unzip . forM chunks $ \spec -> do
    h <- openFile path ReadMode
    hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
    chunk <- Lazy.take (chunkLength spec) `liftM` Lazy.hGetContents h
    return (chunk,h)

-- returns set of chunks representing  tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
    bracket (openFile path ReadMode) hClose $ \h-> do
        totalSize <- fromIntegral `liftM` hFileSize h
        let chunkSize = 1
            findChunks offset = do
            let newOffset = offset + chunkSize
            hSeek h AbsoluteSeek (fromIntegral newOffset)
            let findNewline lineSeekOffset = do
                eof <- hIsEOF h
                if eof
                    then return [CS offset (totalSize - offset)]
                    else do
                        bytes <- Lazy.hGet h 4096
                        case Lazy.elemIndex '\n' bytes of
                            Just n -> do
                                nextChunks <- findChunks (lineSeekOffset + n + 1)
                                return (CS offset (totalSize-offset):nextChunks)
                            Nothing -> findNewline (lineSeekOffset + Lazy.length bytes)
            findNewline newOffset
        findChunks 0

К сожалению, в файле большего размера (например, 2000 строк) версия mapreduce вызывает исключение:
* Исключение: getCurrentDirectory: ресурс исчерпан (слишком много открытых файлов)

Мне немного стыдно, что я не могу отладить программу самостоятельно, но я знаю только, как отлаживать код java / c #. И я также не знаю, как можно правильно протестировать разбиение и чтение файлов. Я ожидаю, что проблема не будет частью самой функции mapreduce, поскольку аналогичная версия без mapreduce также вызывает исключение. В этой попытке я заставил chunkedFileOperation принять как операцию для одного чанка, так и функцию «уменьшить», которую он применил напрямую.

Кстати, я бегу
HaskellPlatform 2011.2.0 на Mac OS X 10.6.7 (снежный барс)
со следующими пакетами:
bytestring 0.9.1.10
параллельно 3.1.0.1
и я квалифицируюсь как самоучка новичка / программиста на свежем языке на Haskell

Ответы [ 2 ]

4 голосов
/ 04 апреля 2011

Вы используете ленивый ввод-вывод, поэтому файлы, открытые с помощью readFile, не закрываются своевременно. Вам нужно будет найти решение, которое регулярно закрывает файлы (например, через строгий ввод-вывод или повторный ввод-вывод).

0 голосов
/ 04 апреля 2011

Эта ошибка в точности означает, что она говорит: у вашего процесса слишком много открытых файлов.ОС накладывает произвольное ограничение на количество файлов (или каталогов), которые процесс может одновременно читать.Смотрите вашу ulimit(1) справочную страницу и / или ограничивайте количество картографов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...