Есть ли альтернатива mapConcurrently, которая не отменяет все
другое действие, если одно заканчивается исключением?
Здесь исключение вполне предсказуемо, поэтому, возможно, нам следует решить проблему в источнике, например проверка на EOF перед чтением каждой строки. Мы могли бы поместить это в IO (Maybe String)
действие, которое использовало Nothing
для обозначения EOF.
getLineMaybe :: IO (Maybe String)
getLineMaybe =
do isEOF <- hIsEOF stdin
if isEOF then return Nothing
else Just <$> System.IO.getLine
В вашем примере есть проблема: одновременная запись в стандартный вывод может привести к искаженному результату . Процесс записи в стандартный вывод должен выполняться только из одного потока, и, возможно, также из стандартного ввода.
Возможно, мы могли бы иметь две (закрытые и ограниченные) параллельные очереди, одну, в которую мы помещаем строки, считанные из stdin, и другую, в которую мы помещаем обработанные результаты, которые будут записаны позже. При соединении одного с другим будет несколько рабочих потоков.
Использование пакетов async , stm и stm-chans
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TVar
import qualified Control.Concurrent.STM.TBMQueue as Q -- closeable, bounded queue
и эта вспомогательная функция
untilNothing :: IO (Maybe a) -> (a -> IO ()) -> IO () -> IO ()
untilNothing action handler finalizer =
let go = do mx <- action
case mx of
Nothing -> finalizer
Just x -> do handler x
go
in go
мы можем написать общую функцию, подобную следующей
data ConcConf = ConcConf {
pendingQueueSize :: Int,
doneQueueSize :: Int,
concurrencyLevel :: Int
} deriving Show
concPipeline :: ConcConf -> IO (Maybe a) -> (a -> IO b) -> (b -> IO ()) -> IO ()
concPipeline conf reader transformer writer =
do src <- atomically $ Q.newTBMQueue (pendingQueueSize conf)
dst <- atomically $ Q.newTBMQueue (doneQueueSize conf)
workersLeft <- atomically $ newTVar (concurrencyLevel conf)
let gang = replicateConcurrently_ (concurrencyLevel conf)
pipeline =
untilNothing reader
(\a -> atomically $ Q.writeTBMQueue src a)
(atomically $ Q.closeTBMQueue src)
`concurrently_`
untilNothing (atomically $ Q.readTBMQueue dst)
writer
(pure ())
`concurrently_`
-- worker threads connecting reader and writer
gang (untilNothing (atomically $ Q.readTBMQueue src)
(\a -> do b <- transformer a
atomically $ Q.writeTBMQueue dst b)
-- last one remaining closes shop
(atomically $ do modifyTVar' workersLeft pred
c <- readTVar workersLeft
if c == 0 then Q.closeTBMQueue dst
else pure ()))
pipeline