Как я могу использовать Data.Concurrent.mergeIO? - PullRequest
2 голосов
/ 10 февраля 2010

Я вижу две функции mergeio и nmergeio в Data.Concurrent, но я не могу найти примеры их работы.

Кто-нибудь работал с этим раньше? Я надеюсь, что смогу использовать их, чтобы получить функцию типа "parMapM".

1 Ответ

2 голосов
/ 10 февраля 2010
import Control.Concurrent (mergeIO, nmergeIO)

main = do
  xs <- mergeIO (map (*2) [1..10])
                (map (+3) [100..110])
  print xs

  xs <- nmergeIO [ map (\x->x*x) [1..10]
                 , map (\x->x+x) [1..10]
                 ]
  print $ maximum xs

Выход:

[2,4,103,6,104,8,105,10,106,12,107,14,108,16,109,18,110,20,111,112,113]
100

Внутренний порядок может отличаться в зависимости от того, насколько быстро каждый поток передает результаты.

Писать parMapM немного сложно, но результат хороший:

import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.Word
import System.IO

import qualified Data.ByteString as BS

main :: IO ()
main = do
  xs <- parMapM (reverse . show) $ replicate 4 (readFromNet 5)
  print xs

Мы будем читать из /dev/urandom как standin:

readFromNet :: Int -> IO [Word8]
readFromNet n = do
  h <- openFile "/dev/urandom" ReadMode 
  let go :: Int -> IO [Word8]
      go 0 = return []
      go remaining = do s <- BS.head <$> BS.hGet h 1
                        ss <- go (remaining-1)
                        return (s:ss)
  go n

Наконец кровавые биты:

parMapM :: (a -> b) -> [IO [a]] -> IO [b]
parMapM f as = do
  kids <- newMVar []
  answers <- atomically $ newTVar []
  forM_ as $ \ a ->
    do mvar <- newEmptyMVar
       curkids <- takeMVar kids
       putMVar kids (mvar:curkids)
       let ax = do xs <- a
                   atomically $ do sofar <- readTVar answers
                                   writeTVar answers (sofar ++ xs)
       forkIO (ax `finally` putMVar mvar ())
  waitForChildren kids
  atomically $ map f <$> readTVar answers
  where
    waitForChildren kids = do ks <- takeMVar kids
                              case ks of
                                [] -> return ()
                                m:ms -> do
                                  putMVar kids ms
                                  takeMVar m
                                  waitForChildren kids

Это работает, когда дети записывают свои ответы на TVar, в то время как основной поток ожидает, пока дети сообщат о своем завершении.

К сожалению, результаты «коренастые», поскольку readFromNet не знает о проблемах связи, поэтому мы получаем все значения из заданного потока одновременно. Если вы не возражаете запачкать их руки, вы можете сделать это следующим образом:

main :: IO ()
main = do
  let threads = 3
      nbytes  = 10
      total   = nbytes * threads
  byte <- newEmptyMVar
  let thr = forkIO $ readFromNetwork nbytes byte
      go 0 = return []
      go n = do b <- takeMVar byte
                bs <- go (n-1)
                return (b:bs)
  sequence_ $ replicate threads thr
  values <- map (reverse . show) <$> go total
  print values

Тогда рабочий выглядит как

readFromNetwork :: Int -> MVar Word8 -> IO ()
readFromNetwork n var = do
  -- or something...
  h <- openFile "/dev/urandom" ReadMode 
  let go 0 = return ()
      go remaining = do s <- BS.hGet h 1
                        putMVar var (BS.head s)
                        go (remaining-1)
  go n
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...