Por que minha implementação modificada (haskell do mundo real) Mapreduce falha com “Muitos arquivos abertos”
Estou implementando um programa haskell que compara cada linha de um arquivo com a outra linha do arquivo. Por simplicidade, vamos assumir que a estrutura de dados representada por uma linha é apenas um Int, e meu algoritmo é a distância ao quadrado. Eu implementaria da seguinte maneira:
--My operation
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
combineDistances :: [Int] -> Int
combineDistances = sum
--Applying my operation simply on a file
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
allDistances _ = 0
--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
where infiniteList :: Int->Int-> [Int]
infiniteList i j = (i + j) : infiniteList j (i+j)
Infelizmente, o arquivo completo será mantido na memória. Para evitar possíveis exceções de falta de memória em arquivos muito grandes, gostaria de procurar o filecursor de volta ao início do arquivo, a cada recursão de 'allDistances'.
No livro "Real World Haskell" é fornecida uma implementação de mapreduce, com a função de dividir um arquivo em pedaços (capítulo 24, disponível emAqu). Modifiquei a função de chunking para, em vez de dividir o arquivo completo em chunks, retornar tantos chunks quanto linhas com cada chunk representando um elemento de
tails . lines. readFile
A implementação completa é (mais a região do código anterior)
import qualified Data.ByteString.Lazy.Char8 as Lazy
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
import System.IO
--Applying my operation using mapreduce on a very big file
sumOfDistancesOnFile :: FilePath -> IO Int
sumOfDistancesOnFile path = chunkedFileOperation chunkByLinesTails (distancesUsingMapReduce) path
distancesUsingMapReduce :: [Lazy.ByteString] -> Int
distancesUsingMapReduce = mapReduce rpar (distancesFirstToTail . lexer)
rpar combineDistances
where lexer :: Lazy.ByteString -> [Int]
lexer chunk = map (read . Lazy.unpack) (Lazy.lines chunk)
distancesOneToMany :: Int -> [Int] -> Int
distancesOneToMany one many = combineDistances $ map (distance one) many
distancesFirstToTail :: [Int] -> Int
distancesFirstToTail s =
if not (null s)
then distancesOneToMany (head s) (tail s)
else 0
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
-> (a -> b) -- map function
-> Strategy c -- evaluation strategy for reduction
-> ([b] -> c) -- reduce function
-> [a] -- list to map over
-> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
mapResult `pseq` reduceResult
where mapResult = parMap mapStrat mapFunc input
reduceResult = reduceFunc mapResult `using` reduceStrat
--Working with (file)chunks:
data ChunkSpec = CS{
chunkOffset :: !Int64
, chunkLength :: !Int64
} deriving (Eq,Show)
chunkedFileOperation :: (NFData a)=>
(FilePath-> IO [ChunkSpec])
-> ([Lazy.ByteString]-> a)
-> FilePath
-> IO a
chunkedFileOperation chunkCreator funcOnChunks path = do
(chunks, handles)<- chunkedRead chunkCreator path
let r = funcOnChunks chunks
(rdeepseq r `seq` return r) `finally` mapM_ hClose handles
chunkedRead :: (FilePath -> IO [ChunkSpec])
-> FilePath
-> IO ([Lazy.ByteString], [Handle])
chunkedRead chunkCreator path = do
chunks <- chunkCreator path
liftM unzip . forM chunks $ \spec -> do
h <- openFile path ReadMode
hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
chunk <- Lazy.take (chunkLength spec) `liftM` Lazy.hGetContents h
return (chunk,h)
-- returns set of chunks representing tails . lines . readFile
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
bracket (openFile path ReadMode) hClose $ \h-> do
totalSize <- fromIntegral `liftM` hFileSize h
let chunkSize = 1
findChunks offset = do
let newOffset = offset + chunkSize
hSeek h AbsoluteSeek (fromIntegral newOffset)
let findNewline lineSeekOffset = do
eof <- hIsEOF h
if eof
then return [CS offset (totalSize - offset)]
else do
bytes <- Lazy.hGet h 4096
case Lazy.elemIndex '\n' bytes of
Just n -> do
nextChunks <- findChunks (lineSeekOffset + n + 1)
return (CS offset (totalSize-offset):nextChunks)
Nothing -> findNewline (lineSeekOffset + Lazy.length bytes)
findNewline newOffset
findChunks 0
nfelizmente, em um arquivo maior (por exemplo, 2000 linhas), a versão mapreduce gera uma exceção:
* Exceção: getCurrentDirectory: recurso esgotado (muitos arquivos abertos)
Estou um pouco envergonhado por não conseguir depurar o programa sozinho, mas só sei como depurar código java / c #. E também não sei como o bloco e a leitura do arquivo podem ser testados corretamente. Espero que o problema não faça parte da própria função mapreduce, pois uma versão semelhante sem mapreduce também gera uma exceção. Nessa tentativa, o chunkedFileOperation aceitou a operação de um pedaço e a função 'reduzir', aplicada diretament
BTW, eu estou correndo
HaskellPlatform 2011.2.0 no Mac OS X 10.6.7 (snow leopard)
com os seguintes pacotes:
bytestring 0.9.1.10
parallel 3.1.0.1
e eu me qualifico como um iniciante autodidata / programador haskell novo