Почему моя параллельная программа на Haskell завершается преждевременно? - PullRequest
5 голосов
/ 25 ноября 2011

У меня есть UDP-сервер, который отражает каждое пинг-сообщение, которое он получает (я думаю, это работает хорошо). На стороне клиента я бы хотел сделать две вещи:

  1. убедитесь, что я выдал N (например, 10000) сообщений, а
  2. подсчитать количество правильно полученных ответов.

Похоже, что либо из-за природы UDP, либо из-за forkIO мой приведенный ниже клиентский код преждевременно завершается / вообще не учитывается.

Также я очень удивлен, увидев, что функция tryOnePing возвращает 250 раз Int 4. Почему это может быть?

main = withSocketsDo $ do
        s <- socket AF_INET Datagram defaultProtocol
        hostAddr <- inet_addr host
        thread <- forkIO $ receiveMessages s
        -- is there any better way to eg to run that in parallel and make sure
        -- that sending/receiving are asynchronous? 


        -- forM_ [0 .. 10000] $ \i -> do
              -- sendTo s "ping" (SockAddrInet port hostAddr)
        -- actually this would be preferred since I can discard the Int 4 that
        -- it returns but forM or forM_ are out of scope here?

        let tryOnePing i = sendTo s "ping" (SockAddrInet port hostAddr)
        pings <- mapM tryOnePing [0 .. 1000]
        let c = length $ filter (\x -> x==4) pings

        -- killThread thread
        -- took that out to make sure the function receiveMessages does not
        -- end prematurely. still seems that it does

        sClose s
        print c
        -- return()

receiveMessages :: Socket -> IO ()
receiveMessages socket = forever $ do
        -- also tried here forM etc. instead of forever but no joy
        let recOnePing i = recv socket 1024
        msg <- mapM recOnePing [0 .. 1000]
        let r = length $ filter (\x -> x=="PING") msg
        print r
        print "END"

1 Ответ

6 голосов
/ 25 ноября 2011

Основная проблема заключается в том, что, когда ваш основной поток завершает работу, все остальные потоки уничтожаются автоматически.Вы должны заставить основной поток ждать receiveMessages thread, иначе он, скорее всего, просто завершит работу, прежде чем будут получены какие-либо ответы.Один простой способ сделать это - использовать MVar.

. MVar - это синхронизированная ячейка, которая может быть пустой или содержать ровно одно значение.Текущий поток заблокируется, если он попытается извлечь из пустого MVar или вставить в полный.В этом случае нам не важно само значение, поэтому мы просто сохраним ().

Начнем с пустого MVar.Затем основной поток разветвляется из потока получателя, отправляет все пакеты и пытается получить значение из MVar.

import Control.Concurrent.MVar

main = withSocketsDo $ do
    -- prepare socket, same as before

    done <- newEmptyMVar

    -- we need to pass the MVar to the receiver thread so that
    -- it can use it to signal us when it's done
    forkIO $ receiveMessages sock done

    -- send pings, same as before

    takeMVar done    -- blocks until receiver thread is done

В потоке получателя мы получим все сообщения, а затемвставьте () в MVar, чтобы сигнализировать, что мы закончили получать.

receiveMessages socket done = do
    -- receive messages, same as before

    putMVar done ()  -- allows the main thread to be unblocked

Это решает основную проблему, и программа отлично работает на моем ноутбуке с Ubuntu, но есть еще паравещи, о которых вы хотите позаботиться.

  • sendTo не гарантирует отправку всей строки.Вам нужно проверить возвращаемое значение, чтобы увидеть, сколько было отправлено, и повторить попытку, если не все было отправлено.Это может произойти даже для короткого сообщения, например "ping", если буфер отправки заполнен.

  • recv требует подключенного сокета.Вместо этого вы захотите использовать recvFrom.(Хотя он все еще работает на моем ПК по неизвестной причине).

  • Печать на стандартный вывод не синхронизирована, поэтому вы можете изменить это, чтобы использовать MVarсообщить количество полученных пакетов вместо ().Таким образом, вы можете сделать весь вывод из основного потока.Также можно использовать другой MVar в качестве мьютекса для управления доступом к стандартному выводу.

Наконец, я рекомендую прочитать документацию Network.Socket , Control.Concurrent и Control.Concurrent.MVar тщательно.Большая часть моего ответа собрана из информации, найденной там.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...