Надежно ждете несколько асин c? - PullRequest
3 голосов
/ 21 января 2020

Мой код должен запускать несколько потоков и отслеживать, какие из них завершены, а какие еще работают. Я планировал использовать waitAny или waitAnyCatch, но в документации 1005 *

было выбрано следующее соответствует первому завершенному Asyn c в списке.

Если это действительно случай, как можно когда-либо надежно отслеживать выполнение / завершение потоков?

Вот мой упрощенный код:

chan <- newChan
currentThreadsRef <- newIORef []

-- read jobs from a channel, and run them in parallel asyncs/threads,
-- while adding all threads references to currentThreadsRef
async $ do
  jobArgs <- readChan chan
  jobAsync <- async $ runJob jobArgs
  atomicallyModifyIORef' currentThreadsRef $ \x -> (jobAsync:x, ())

-- wait for jobs to be finished, and remove the thread refernece
-- from currentThreadsRef 
waitForAllJobs currentJobsRef = do
  (readIORef currentJobsRef) >>= \case
    [] -> logDebug "All jobs exited"
    currentJobs -> do
      (exitedJob, jobResult) <- waitAnyCatch currentJobs
      atomicallyModifyIORef currentJobsRef $ \x -> (filter (/= exitedjob) x, ())
      logDebug $ "Job completed with result=" <> show result
      waitForAllJobs currentJobsRef

PS: Хотя это может быть неочевидно из моего упрощенного кода выше, есть причина, по которой я не могу просто использовать mapConcurrently поверх входных данных. На самом деле, async-pool кажется подходящим для моего варианта использования, но даже у него та же проблема с waitAny.

1 Ответ

4 голосов
/ 22 января 2020

Вот программа, которая запускает 1000 асинхронных циклов, настроенных на завершение в течение секунды, и ожидает их всех в течение oop. Скомпилированный с ghc -O2 -threaded и запущенный с +RTS -N, он запускается примерно за 1,5 секунды, и ни один из асинхронных устройств не «теряется»:

import Control.Concurrent
import Control.Concurrent.Async
import qualified Data.Set as Set

main :: IO ()
main = do
  let n = 1000 :: Int
  asyncs0 <- mapM (\i -> async (threadDelay 1000000 >> return i)) [1..n]
  let loop :: Set.Set (Async Int) -> IO ()
      loop asyncs | null asyncs = return ()
                  | otherwise = do
                      (a, _i) <- waitAny (Set.toList asyncs)
                      loop (Set.delete a asyncs)
  loop (Set.fromList asyncs0)

Итак, как упоминалось в комментарии, документация ссылаясь на тот факт, что первый завершенный асин c в предоставленном списке является тем, который будет «возвращен», но если несколько асинхронных завершено, дополнительные не будут «забыты». Вам просто нужно удалить возвращенный асин c из списка и провести повторный опрос, и вы в конечном итоге получите их все.

Таким образом, у вас не должно возникнуть никаких проблем при ожидании нескольких асинхронных операций с * 1008. *.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...