Строгие методы оценки для одновременных каналов в Haskell - PullRequest
4 голосов
/ 06 марта 2011

Я играю с потоками Haskell и сталкиваюсь с проблемой передачи лениво оцененных значений по каналу.Например, с N рабочими потоками и 1 выходным потоком рабочие сообщают о неоцененной работе, и выходной поток в итоге выполняет эту работу за них.

Я читал об этой проблеме в различной документации и видел различные решенияно я нашел только одно решение, которое работает, а остальные нет.Ниже приведен код, в котором рабочие потоки запускают вычисления, которые могут занять много времени.Я запускаю потоки в порядке убывания, чтобы первый поток занимал самое длинное, а более поздние потоки заканчивались раньше.

import Control.Concurrent (forkIO)
import Control.Concurrent.Chan   -- .Strict
import Control.Concurrent.MVar
import Control.Exception (finally, evaluate)
import Control.Monad (forM_)
import Control.Parallel.Strategies (using, rdeepseq)

main = (>>=) newChan $ (>>=) (newMVar []) . run

run :: Chan (Maybe String) -> MVar [MVar ()] -> IO ()
run logCh statVars = do
  logV <- spawn1 readWriteLoop
  say "START"
  forM_ [18,17..10] $ spawn . busyWork
  await
  writeChan logCh Nothing -- poison the logger
  takeMVar logV
  putStrLn "DONE"
  where
    say mesg = force mesg >>= writeChan logCh . Just

    force s = mapM evaluate s  -- works
--    force s = return $ s `using` rdeepseq  -- no difference
--    force s = return s -- no-op; try this with strict channel

    busyWork = say . show . sum . filter odd . enumFromTo 2 . embiggen
    embiggen i = i*i*i*i*i

    readWriteLoop = readChan logCh >>= writeReadLoop
    writeReadLoop Nothing = return ()
    writeReadLoop (Just mesg) = putStrLn mesg >> readWriteLoop

    spawn1 action = do
      v <- newEmptyMVar
      forkIO $ action `finally` putMVar v ()
      return v

    spawn action = do
      v <- spawn1 action
      modifyMVar statVars $ \vs -> return (v:vs, ())

    await = do
      vs <- modifyMVar statVars $ \vs -> return ([], vs)
      mapM_ takeMVar vs

При использовании большинства методов результаты выдаются в порядке появления;то есть самое длительное вычисление первым.Я понимаю, что это означает, что выходной поток выполняет всю работу:

-- results in order spawned (longest-running first = broken)
START
892616806655
503999185040
274877906943
144162977343
72313663743
34464808608
15479341055
6484436675
2499999999
DONE

Я думал, что ответом будет строгие каналы, , но они не сработали.Я понимаю, что WHNF для строк является недостаточным, потому что это просто заставит внешний конструктор (ноль или минусы для первого символа строки).rdeepseq должен полностью оценить, но это не имеет значения.Единственное, что я нашел, это работает - это сопоставить Control.Exception.evaluate :: a -> IO a со всеми символами в строке.(См. Комментарии функции force в коде для нескольких различных альтернатив.) Вот результат с Control.Exception.evaluate:

-- results in order finished (shortest-running first = correct)
START
2499999999
6484436675
15479341055
34464808608
72313663743
144162977343
274877906943
503999185040
892616806655
DONE

Так почему же строгие каналы или rdeepseq не дают такого результата?Есть ли другие техники?Я неправильно понимаю, почему первый результат нарушен?

1 Ответ

5 голосов
/ 06 марта 2011

Здесь происходит две проблемы.

Причина, по которой первая попытка (с использованием явного rnf) не работает, заключается в том, что, используя return, вы создали thunk, которыйполностью оценивает себя, когда оценивается, но сам thunk не оценивается.Обратите внимание, что тип оценки - a -> IO a: тот факт, что он возвращает значение в IO, означает, что evaluate может наложить порядок:

return (error "foo")   >> return 1 == return 1
evaluate (error "foo") >> return 1 == error "foo"

В результате этот код:

force s = evaluate $ s `using` rdeepseq

будет работать (например, будет иметь то же поведение, что и mapM_ evaluate s).


Случай использования строгих каналов немного сложнее, но я полагаю, что это связано сошибка в строгом параллелизме.Дорогие вычисления на самом деле выполняются в рабочих потоках, но они не приносят вам большой пользы (вы можете проверить это явно, скрыв некоторые асинхронные исключения в ваших строках и увидев, в каком потоке поверхности исключений).

В чем ошибка?Давайте посмотрим на код строгого writeChan:

writeChan :: NFData a => Chan a -> a -> IO ()
writeChan (Chan _read write) val = do
  new_hole <- newEmptyMVar
  modifyMVar_ write $ \old_hole -> do
    putMVar old_hole $! ChItem val new_hole
    return new_hole

. Мы видим, что modifyMVar_ вызывается на write, прежде чем мы оценим thunk.Последовательность операций тогда:

  1. writeChan введено
  2. Мы takeMVar write (блокирование любого другого, кто хочет записать на канал)
  3. Мыоцените дорогой thunk
  4. Мы поместили дорогой thunk на канал
  5. We putMVar write, разблокируя все остальные темы

Вы не видите этогоповедение с evaluate вариантами, потому что они выполняют оценку до того, как блокировка получена.

Я отправлю Дону сообщение об этом и посмотрю, согласен ли он, что такое поведение является неоптимальным.

Дон соглашается, что это поведение неоптимально.Мы работаем над патчем.

...