Использование mapConcurrently для чтения стандартного ввода, выполнения HTTP-вызовов и параллельной записи в стандартный вывод - PullRequest
0 голосов
/ 12 мая 2018

Я пишу программу, которая считывает несколько URL-адресов (по одному на строку) из стандартного ввода, слегка адаптирует их и выполняет HTTP-запросы для каждого из этих нескольких URL-адресов параллельно.Ответы выводятся на стандартный вывод.Вот код:

{-# LANGUAGE OverloadedStrings #-}

module Main where

import Control.Monad
import Network.Wreq
import Control.Concurrent.MSem
import Control.Concurrent.Async
import Control.Concurrent (threadDelay)
import qualified Data.Traversable as T

main :: IO ()
main = void $ mapPool 4 (const processUrl) [1..]

mapPool :: T.Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
mapPool max f xs = do semaphore <- new max
                      mapConcurrently (with semaphore . f) xs

processUrl :: IO ()
processUrl = do param <- getLine
                response <- get (url ++ param)
                print response

url = "http://example.com/resources?param="

Параллелизм здесь жестко закодирован до четырех.Проблема возникает, когда некоторые действий ввода-вывода (HTTP-запросов) в пакете завершаются ошибкой.Согласно дизайну Control.Concurrent.Async.mapConcurrently, если одно действие не выполнено, остальные отменяются.В моем случае кажется, что последний пакет всегда будет неудачным, потому что ввод достигает EOF, возникает исключение, и программа выводит:

my-program-exe: <stdin>: hGetLine: end of file

Есть ли альтернатива для mapConcurrently, которая не отменяет все другие действия в случаеодин заканчивается исключением?Если нет, то есть ли лучший способ подойти к этому типу задач?

1 Ответ

0 голосов
/ 12 мая 2018

Есть ли альтернатива mapConcurrently, которая не отменяет все другое действие, если одно заканчивается исключением?

Здесь исключение вполне предсказуемо, поэтому, возможно, нам следует решить проблему в источнике, например проверка на EOF перед чтением каждой строки. Мы могли бы поместить это в IO (Maybe String) действие, которое использовало Nothing для обозначения EOF.

getLineMaybe :: IO (Maybe String)
getLineMaybe =
    do isEOF <- hIsEOF stdin
       if isEOF then return Nothing
                else Just <$> System.IO.getLine

В вашем примере есть проблема: одновременная запись в стандартный вывод может привести к искаженному результату . Процесс записи в стандартный вывод должен выполняться только из одного потока, и, возможно, также из стандартного ввода.

Возможно, мы могли бы иметь две (закрытые и ограниченные) параллельные очереди, одну, в которую мы помещаем строки, считанные из stdin, и другую, в которую мы помещаем обработанные результаты, которые будут записаны позже. При соединении одного с другим будет несколько рабочих потоков.

Использование пакетов async , stm и stm-chans

import           Control.Concurrent.Async
import           Control.Concurrent.STM
import           Control.Concurrent.STM.TVar
import qualified Control.Concurrent.STM.TBMQueue as Q -- closeable, bounded queue

и эта вспомогательная функция

untilNothing :: IO (Maybe a) -> (a -> IO ()) -> IO () -> IO ()
untilNothing action handler finalizer =
   let go = do mx <- action
               case mx of
                   Nothing -> finalizer
                   Just x -> do handler x
                                go
    in go     

мы можем написать общую функцию, подобную следующей

data ConcConf = ConcConf {
                           pendingQueueSize :: Int,
                           doneQueueSize :: Int,
                           concurrencyLevel :: Int
                         } deriving Show

concPipeline :: ConcConf -> IO (Maybe a) -> (a -> IO b) -> (b -> IO ()) -> IO ()
concPipeline conf reader transformer writer =
    do src <- atomically $ Q.newTBMQueue (pendingQueueSize conf)
       dst <- atomically $ Q.newTBMQueue (doneQueueSize conf)
       workersLeft <- atomically $ newTVar (concurrencyLevel conf)
       let gang = replicateConcurrently_ (concurrencyLevel conf)
           pipeline =
               untilNothing reader
                            (\a -> atomically $ Q.writeTBMQueue src a)
                            (atomically $ Q.closeTBMQueue src)
               `concurrently_`
               untilNothing (atomically $ Q.readTBMQueue dst)
                            writer
                            (pure ())
               `concurrently_`
               -- worker threads connecting reader and writer
               gang (untilNothing (atomically $ Q.readTBMQueue src)
                                  (\a -> do b <- transformer a
                                            atomically $ Q.writeTBMQueue dst b)
                                  -- last one remaining closes shop
                                  (atomically $ do modifyTVar' workersLeft pred
                                                   c <- readTVar workersLeft
                                                   if c == 0 then Q.closeTBMQueue dst
                                                             else pure ()))
       pipeline
...