Является ли Network.Socket программированием на Haskell «Одновременно, асинхронно, параллельно, неблокирующим» с использованием forkIO - PullRequest
0 голосов
/ 20 сентября 2018

Пожалуйста, я хочу понять, обрабатывает ли RTS GHC одновременную блокировку операций чтения / записи, т. Е. Предположим, что мой гипотетический сервер может обрабатывать только потоки CPU 1000 параллельно, если все 1000 forkIO заблокированы из-за блокировки сокетов постоянных клиентов чтения / записи инеобходимо обработать еще 500 запросов.

Option-A> Должен ли другой запрос 500 ждать, пока не завершится процесс 1000 forkIO.

Option-B> Haskell внутренне обрабатывает (т.е.Одновременно, асинхронно, параллельно, неблокирующе) 1000 + 500 forkIO, эффективно используя все 1000 потоков ЦП.

Только для информации, я прочитал много обучающих программ по сокетам (и блогов) для C и Haskell (Сеть).Сокет), чтобы иметь представление о том, как Haskell (forkIO) и C обрабатывает (то есть одновременно, асинхронно, параллельно, неблокирует), но у меня нет четкого понимания того, как Haskell делает это на самом деле.

Ссылка:

https://github.com/lpeterse/haskell-socket/issues/15#issuecomment-224382491

1 Ответ

0 голосов
/ 20 сентября 2018

Обратите внимание, что forkIO порождает зеленый поток, а не поток O / S, поэтому, если вы создаете 1000 forkIO потоков для обработки одновременных запросов, это не соответствует 1000 отдельным O / S (или "CPU")потоки.

В любом случае, да, RTS обрабатывает несколько блокирующих вызовов чтения / записи одновременно, не связывая потоки O / S.Фактически, по умолчанию, когда вы компилируете и запускаете программу на Haskell без флага -threaded или без опции -N RTS, она запускает все в одном O / S-потоке, даже те вещи, которые вы «разветвляете»"с forkIO, и несколько зеленых потоков могут по-прежнему блокироваться, не блокируя (только) поток O / S.

Вы также никогда бы не запустили сервер Haskell в 1000 отдельных потоках O / S.RTS может планировать тысячи зеленых потоков в небольшом пуле потоков O / S гораздо более эффективно, чем запуск каждого зеленого потока в своем собственном потоке O / S, потому что переключение между зелеными потоками не требует дорогостоящего переключения контекста O / S.

Вы можете найти эту статью 2012 года о библиотеке веб-сервера Warp полезной.В частности, они сравнивают и сопоставляют несколько возможных архитектур: один поток ЦП на запрос (т. Е. Проект, который вы представляете), управляемую событиями архитектуру, гибрид с одним процессом обработки событий на ядро ​​и легковесную модель Haskell.потоки, работающие в пуле потоков O / S.Обратите внимание, что Warp использует Network.Socket под капотом.В своих тестах они обрабатывали запросы для 1000 одновременных клиентов, используя потоки от 1 до 10.

Если вы хотите получить конкретное доказательство того, что потоки forkIO не будут блокироваться, вотИгрушечная программа:

import Control.Monad
import Control.Concurrent
import Network.Socket hiding (recv)
import Network.Socket.ByteString
import qualified Data.ByteString.Char8 as BS

forks = 10

main = withSocketsDo $ do
  s <- socket AF_INET Datagram defaultProtocol
  bind s (SockAddrInet 6667 (tupleToHostAddress (127,0,0,1)))
  replicateM_ forks $ forkIO (BS.putStrLn =<< recv s 4096)
  let loop = (putStrLn =<< getLine) >> loop
  loop

Если вы скомпилируете и запустите это без многопоточности:

$ stack ghc -- -O2 Socket.hs && ./Socket

появится 10 ожидающих forkIO потоков, а затем введите getLine / putStrLnЦикл, который будет отражать ваш ввод обратно к вам.Тем временем вы можете использовать netcat или ваш любимый сетевой инструмент для отправки запросов ожидающим потокам:

$ echo -n 'request' | nc -uw0 localhost 6667

, которые также будут отображаться сервером.После 10 запросов у вас будут исчерпаны ожидающие потоки, и он больше не будет отвечать на сетевые запросы.

Затем вы можете увеличить потоки с помощью fork = 10000, чтобы создать 10000 ожидающих потоков.Пока они ожидают, основной цикл getLine / putStrLn продолжит работать без сбоев.

Все это происходит в одном потоке O / S, что можно проверить, посмотрев на ps -Lf или что-то еще.

В комментарии возник вопрос о том, нужно ли больше потоков, если одновременно используется несколько сокетов и программа скомпилирована с -threaded.Следующая тестовая программа:

import Control.Monad
import Control.Concurrent
import Network.Socket hiding (recv)
import Network.Socket.ByteString
import qualified Data.ByteString.Char8 as BS

forks = 50

main = withSocketsDo $ do
  forM_ [0..forks-1] $ \i -> forkIO $ do
    s <- socket AF_INET Datagram defaultProtocol
    bind s (SockAddrInet (6667+i) (tupleToHostAddress (127,0,0,1)))
    BS.putStrLn =<< recv s 4096
  let loop = (putStrLn =<< getLine) >> loop
  loop

создает 50 отдельных сокетов на портах с 6667 по 6716 и ожидает их.Если он скомпилирован без потоков, он без затруднений запускается в одном потоке O / S.Если он скомпилирован с многопоточностью и снабжен подсчетом возможностей, превышающим единицу, например:

$ stack ghc -- -O2 -threaded Socket.hs
$ ./Socket +RTS -N4

, он, по-видимому, запускает 11 рабочих потоков (которые я считаю «основным потоком», четыре возможности плюс шесть резервных рабочих).потоки, как указано константой MAX_SPARE_WORKERS в источнике RTS), которые совместно используют ожидание на этих 50 сокетах.

Кроме того, в коде Network.Socket это выполняется путем вызова recvнапример, в конечном итоге реализовано как:

throwSocketErrorWaitRead sock "..." $
    c_recv s (castPtr ptr) (fromIntegral nbytes) 0

с оболочкой throwSocketErrorWaitRead, определенной как:

throwSocketErrorWaitRead :: (Eq a, Num a) => 
     Socket -> String -> IO a -> IO a
throwSocketErrorWaitRead sock name io =
    throwSocketErrorIfMinus1RetryMayBlock name
        (threadWaitRead $ fromIntegral $ fdSocket sock)
        io

и throwSocketErrorIfMinus1RetryMayBlock, задокументированные так:

throwSocketErrorIfMinus1RetryMayBlock
    :: (Eq a, Num a)
    => String  -- ^ textual description of the location
    -> IO b    -- ^ action to execute before retrying if an
               --   immediate retry would block
    -> IO a    -- ^ the 'IO' operation to be executed
    -> IO a

Все это немного сложно, но в итоге вызов обёрток c_recv with является действительным системным вызовом recv.Он никогда не блокируется, потому что сокет настроен как неблокирующий, и если он возвращается с кодом ошибки, указывающим, что он будет блокировать, вызов threadWaitRead используется для предупреждения RTS о том, что этот зеленый поток должен спать доданные доступны для чтения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...