Обратите внимание, что forkIO
порождает зеленый поток, а не поток O / S, поэтому, если вы создаете 1000 forkIO
потоков для обработки одновременных запросов, это не соответствует 1000 отдельным O / S (или "CPU")потоки.
В любом случае, да, RTS обрабатывает несколько блокирующих вызовов чтения / записи одновременно, не связывая потоки O / S.Фактически, по умолчанию, когда вы компилируете и запускаете программу на Haskell без флага -threaded
или без опции -N
RTS, она запускает все в одном O / S-потоке, даже те вещи, которые вы «разветвляете»"с forkIO
, и несколько зеленых потоков могут по-прежнему блокироваться, не блокируя (только) поток O / S.
Вы также никогда бы не запустили сервер Haskell в 1000 отдельных потоках O / S.RTS может планировать тысячи зеленых потоков в небольшом пуле потоков O / S гораздо более эффективно, чем запуск каждого зеленого потока в своем собственном потоке O / S, потому что переключение между зелеными потоками не требует дорогостоящего переключения контекста O / S.
Вы можете найти эту статью 2012 года о библиотеке веб-сервера Warp полезной.В частности, они сравнивают и сопоставляют несколько возможных архитектур: один поток ЦП на запрос (т. Е. Проект, который вы представляете), управляемую событиями архитектуру, гибрид с одним процессом обработки событий на ядро и легковесную модель Haskell.потоки, работающие в пуле потоков O / S.Обратите внимание, что Warp использует Network.Socket
под капотом.В своих тестах они обрабатывали запросы для 1000 одновременных клиентов, используя потоки от 1 до 10.
Если вы хотите получить конкретное доказательство того, что потоки forkIO
не будут блокироваться, вотИгрушечная программа:
import Control.Monad
import Control.Concurrent
import Network.Socket hiding (recv)
import Network.Socket.ByteString
import qualified Data.ByteString.Char8 as BS
forks = 10
main = withSocketsDo $ do
s <- socket AF_INET Datagram defaultProtocol
bind s (SockAddrInet 6667 (tupleToHostAddress (127,0,0,1)))
replicateM_ forks $ forkIO (BS.putStrLn =<< recv s 4096)
let loop = (putStrLn =<< getLine) >> loop
loop
Если вы скомпилируете и запустите это без многопоточности:
$ stack ghc -- -O2 Socket.hs && ./Socket
появится 10 ожидающих forkIO
потоков, а затем введите getLine
/ putStrLn
Цикл, который будет отражать ваш ввод обратно к вам.Тем временем вы можете использовать netcat или ваш любимый сетевой инструмент для отправки запросов ожидающим потокам:
$ echo -n 'request' | nc -uw0 localhost 6667
, которые также будут отображаться сервером.После 10 запросов у вас будут исчерпаны ожидающие потоки, и он больше не будет отвечать на сетевые запросы.
Затем вы можете увеличить потоки с помощью fork = 10000
, чтобы создать 10000 ожидающих потоков.Пока они ожидают, основной цикл getLine
/ putStrLn
продолжит работать без сбоев.
Все это происходит в одном потоке O / S, что можно проверить, посмотрев на ps -Lf
или что-то еще.
В комментарии возник вопрос о том, нужно ли больше потоков, если одновременно используется несколько сокетов и программа скомпилирована с -threaded
.Следующая тестовая программа:
import Control.Monad
import Control.Concurrent
import Network.Socket hiding (recv)
import Network.Socket.ByteString
import qualified Data.ByteString.Char8 as BS
forks = 50
main = withSocketsDo $ do
forM_ [0..forks-1] $ \i -> forkIO $ do
s <- socket AF_INET Datagram defaultProtocol
bind s (SockAddrInet (6667+i) (tupleToHostAddress (127,0,0,1)))
BS.putStrLn =<< recv s 4096
let loop = (putStrLn =<< getLine) >> loop
loop
создает 50 отдельных сокетов на портах с 6667 по 6716 и ожидает их.Если он скомпилирован без потоков, он без затруднений запускается в одном потоке O / S.Если он скомпилирован с многопоточностью и снабжен подсчетом возможностей, превышающим единицу, например:
$ stack ghc -- -O2 -threaded Socket.hs
$ ./Socket +RTS -N4
, он, по-видимому, запускает 11 рабочих потоков (которые я считаю «основным потоком», четыре возможности плюс шесть резервных рабочих).потоки, как указано константой MAX_SPARE_WORKERS
в источнике RTS), которые совместно используют ожидание на этих 50 сокетах.
Кроме того, в коде Network.Socket
это выполняется путем вызова recv
например, в конечном итоге реализовано как:
throwSocketErrorWaitRead sock "..." $
c_recv s (castPtr ptr) (fromIntegral nbytes) 0
с оболочкой throwSocketErrorWaitRead
, определенной как:
throwSocketErrorWaitRead :: (Eq a, Num a) =>
Socket -> String -> IO a -> IO a
throwSocketErrorWaitRead sock name io =
throwSocketErrorIfMinus1RetryMayBlock name
(threadWaitRead $ fromIntegral $ fdSocket sock)
io
и throwSocketErrorIfMinus1RetryMayBlock
, задокументированные так:
throwSocketErrorIfMinus1RetryMayBlock
:: (Eq a, Num a)
=> String -- ^ textual description of the location
-> IO b -- ^ action to execute before retrying if an
-- immediate retry would block
-> IO a -- ^ the 'IO' operation to be executed
-> IO a
Все это немного сложно, но в итоге вызов обёрток c_recv
with является действительным системным вызовом recv
.Он никогда не блокируется, потому что сокет настроен как неблокирующий, и если он возвращается с кодом ошибки, указывающим, что он будет блокировать, вызов threadWaitRead
используется для предупреждения RTS о том, что этот зеленый поток должен спать доданные доступны для чтения.