Какой лучший способ написать какой-нибудь семафороподобный код на Haskell? - PullRequest
3 голосов
/ 27 августа 2011

Предположим, у меня есть функция f, которая принимает целочисленный аргумент. f может не заканчиваться на некоторых аргументах, но его результат одинаково ценен. (Для конкретности аргумент может быть начальным числом для генератора случайных чисел, который передается в решатель SAT.)

Я хочу использовать параллелизм, вызывать f 1, f 2, f 3 и т. Д. И возвращаться после завершения первого. Итак, в каждом потоке должен быть запущен код, который выглядит как

comp <- start_proc (f 1)
wait(comp || anyDone) -- wait for _either_ of these signals to be true
if comp then
    set anyDone = True

Какой самый простой способ сделать это? Оператор AMB приходит на ум, но мне нужно запускать все процессы одновременно (например, на 24- или 80-ядерном компьютере). (Решения для распределенных вычислений были бы еще лучше.) Поверхностный взгляд на вики-страницу AMB предполагает, что она может не поддерживать непрерывные процессы?

тест

В настоящее время я не получаю ответы для работы с тем, что я хочу. Я думаю, что это, вероятно, больше связано с тем, как я создаю процессы, чем с чем-либо еще.

Определение

runProc (x:xs) =
    createProcess (proc x xs) >>= \(_, _, _, h) -> waitForProcess h

Тогда я хочу участвовать в гонках runProc ["zsh", "-c", "sleep 3"] и runProc ["ls"]. Я немного изменил ответ Томаса, но он не сработал.

raceL :: [IO α] -> IO α
raceL ops = do
    mv <- newEmptyMVar
    tids <- forM ops (\op -> forkIO (op >>= putMVar mv))
    answer <- takeMVar mv
    mapM_ killThread tids
    return answer

Компиляция с -threaded и работа с +RTS -N (у меня 4-ядерный компьютер), похоже, не помогает.

Ответы [ 4 ]

8 голосов
/ 28 августа 2011

Почему бы не просто MVar и forkIO?

import Control.Concurrent
import Control.Concurrent.MVar
import System.Environment
import Control.Monad

main = do
  mv <- newEmptyMVar
  [nrThreads] <- liftM (map read) getArgs
  tids <- replicateM nrThreads (forkIO $ operation mv)
  answer <- takeMVar mv
  mapM_ killThread tids

operation :: MVar Int -> IO ()
operation mv = putMVar mv 5

Это приведет к созданию nrThreads легких нитей. Как только один поток закончен, он должен поместить ответ в предоставленный MVar. Все остальные потоки будут уничтожены основным потоком. Никакого явного опроса не требуется, поскольку GHC RTS перенесет main, как только MVar станет непустым.

4 голосов
/ 29 августа 2011

Вместо amb рассмотрим unamb ! Он предоставляет несколько хороших примитивов для гоночных вычислений, как чистых, так и нечистых. Например:

Prelude Data.Unamb> unamb (last [1..]) 32
32
Prelude Data.Unamb> race (threadDelay 5000000 >> return 3) readLn
Prelude Data.Unamb Control.Concurrent> race (threadDelay 5000000 >> return 3) readLn
56
56
Prelude Data.Unamb Control.Concurrent> race (threadDelay 5000000 >> return 3) readLn
3
2 голосов
/ 27 августа 2011

Одним из вариантов будет использование STM для обнаружения завершения, а затем явное уничтожение всех других потоков.Мы можем определить:

start_proc :: IO a -> IO (ThreadId, TVar (Maybe a))

start_proc job = do
  resultVar <- newTVarIO Nothing
  forkIO $ job >>= (atomically . writeTVar resultVar)
  return resultVar

Затем выполните:

any_parallel :: [IO a] -> IO a
any_parallel jobs = do
  (threads, vars) <- liftM unzip $ mapM start_proc jobs
  result <- atomically $ foldl orElse retry (map check_job vars)
  mapM_ killThread threads
  return result
  where
    check_job :: TVar (Maybe a) -> STM a
    check_job resultVar = do
      val <- readTVar resultVar
      case val of
        Nothing -> retry
        Just x  -> return x

Ключевым моментом здесь является то, что в первый раз, когда run_multiple проходит через набор переменных результата, все они Nothingи так это retry с.Монада STM записывает, на что она смотрела, и всякий раз, когда записывается какая-либо из них, транзакция STM перезапускается.В этот момент он видит, что один из TVar s не является Nothing, и может принять результат в этот момент.

Как только мы получим результат, мы, конечно, просто завершаем все потоки.Вероятно, это будет быстрее, чем проверка их внутреннего цикла на наличие какого-либо общего флага;в общем MVar (или что-у-вас) меньше конфликтов.

Обратите внимание, что killThread ждет, пока целевой поток достигнет «безопасной точки» (т. е. выделения памяти), прежде чем убить поток.Это не может быть гарантировано, если целевой поток имеет жесткий внутренний цикл, который не выполняет никакого выделения памяти.Возможно, вы захотите убедиться, что потоки периодически выполняют действие IO, которое вызывает распределение.

1 голос
/ 29 августа 2011

Другой способ сделать это - вручную спланировать ваш код, используя монаду для моделирования шагов вычислений .

Это может позволить вам вручную переключаться между различными вычислительными потоками, шагая по несколько за раз, пока один из них не закончится:

sum5 :: [ Computation (Int, Int) ]
sum5 = [ sum5' x 0 | x <- [ 0, 1.. ] ]
  where sum5' x y = if x + y == 5
                      then return (x,y)
                      else do 
                        y' <- return (y+1) 
                        sum5' x y'

prod6 :: [ Computation (Int, Int) ]
prod6 = [ prod6' x 0 | x <- [ 0, 1.. ] ]
  where prod6' x y = if x * y == 6
                      then return (x,y)
                      else do 
                        y' <- return (y+1) 
                        prod6' x y'

firstSolution :: [Computation a] -> Strategy a -> a
firstSolution cs s = head . toList . runComputation $ s cs

Тогда вы можете увидеть, как разрешить чередование вычислений (даже не завершающих)

ghci> firstSolution sum5 fair
(5,0)
ghci> firstSolution sum5 diagu
(0,5)
ghci> firstSolution sum5 diagd
(5,0)
ghci> firstSolution prod6 fair
^CInterrupted.
ghci> firstSolution prod6 diagu
(2,3)
ghci> firstSolution prod6 diagd
(3,2)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...