Monitor.Wait / Состояние импульсной гонки на многопоточном сервере - PullRequest
8 голосов
/ 12 мая 2009

У меня проблема с блокированными Monitor.Wait и Monitor.Pulse в многопоточном TCP-сервере. Чтобы продемонстрировать мои проблемы, вот мой код сервера:

public class Server
{
    TcpListener listener;
    Object sync;
    IHandler handler;
    bool running;

    public Server(IHandler handler, int port)
    {
        this.handler = handler;
        IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0];
        listener = new TcpListener(address, port);
        sync = new Object();
        running = false;
    }

    public void Start()
    {
        Thread thread = new Thread(ThreadStart);
        thread.Start();
    }

    public void Stop()
    {
        lock (sync)
        {
            listener.Stop();
            running = false;
            Monitor.Pulse(sync);
        }
    }

    void ThreadStart()
    {
        if (!running)
        {
            listener.Start();
            running = true;
            lock (sync)
            {
                while (running)
                {
                    try
                    {
                        listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener);
                        Monitor.Wait(sync);  // Release lock and wait for a pulse
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e.Message);
                    }
                }
            }
        }
    }

    void Accept(IAsyncResult result)
    {
        // Let the server continue listening
        lock (sync)
        {
            Monitor.Pulse(sync);
        } 

        if (running)
        {
            TcpListener listener = (TcpListener)result.AsyncState;
            using (TcpClient client = listener.EndAcceptTcpClient(result))
            {
                handler.Handle(client.GetStream());
            }
        }
    }
}

А вот мой код клиента:

class Client
{
    class EchoHandler : IHandler
    {
        public void Handle(Stream stream)
        {
            System.Console.Out.Write("Echo Handler: ");
            StringBuilder sb = new StringBuilder();
            byte[] buffer = new byte[1024];
            int count = 0;
            while ((count = stream.Read(buffer, 0, 1024)) > 0)
            {
                sb.Append(Encoding.ASCII.GetString(buffer, 0, count));
            }
            System.Console.Out.WriteLine(sb.ToString());
            System.Console.Out.Flush();
        }
    }

    static IPAddress localhost = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0];

    public static int Main()
    {
        Server server1 = new Server(new EchoHandler(), 1000);
        Server server2 = new Server(new EchoHandler(), 1001);

        server1.Start();
        server2.Start();

        Console.WriteLine("Press return to test...");
        Console.ReadLine();

        // Note interleaved ports
        SendMsg("Test1", 1000);
        SendMsg("Test2", 1001);
        SendMsg("Test3", 1000);
        SendMsg("Test4", 1001);
        SendMsg("Test5", 1000);
        SendMsg("Test6", 1001);
        SendMsg("Test7", 1000);

        Console.WriteLine("Press return to terminate...");
        Console.ReadLine();

        server1.Stop();
        server2.Stop();

        return 0;
    }

    public static void SendMsg(String msg, int port)
    {
        IPEndPoint endPoint = new IPEndPoint(localhost, port);

        byte[] buffer = Encoding.ASCII.GetBytes(msg);
        using (Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
        {
            s.Connect(endPoint);
            s.Send(buffer);
        }
    }
}

Клиент отправляет семь сообщений, а сервер печатает только четыре:

Press return to test...

Press return to terminate...
Echo Handler: Test1
Echo Handler: Test3
Echo Handler: Test2
Echo Handler: Test4

Я подозреваю, что монитор запутался, допустив появление Pulse (в методе Accept сервера) до того, как произойдет Wait (в методе ThreadStart), даже если ThreadStart все еще должен блокируйте объект sync до тех пор, пока он не вызовет Monitor.Wait(), а затем метод Accept сможет получить блокировку и отправить ее Pulse. Если вы закомментируете эти две строки в методе Stop() сервера:

//listener.Stop();
//running = false;

Остальные сообщения появляются при вызове метода Stop() сервера (то есть при пробуждении объекта sync сервера он отправляет оставшиеся входящие сообщения). Мне кажется, что это может произойти только в состоянии гонки между методами ThreadStart и Accept, но блокировка вокруг объекта sync должна предотвратить это.

Есть идеи?

Большое спасибо, Саймон.

пс. Обратите внимание, что я знаю, что вывод выводится из строя и т. Д., Я специально спрашиваю о состоянии гонки между блокировками и монитором. Ура, SH.

1 Ответ

5 голосов
/ 12 мая 2009

Проблема в том, что вы используете Pulse / Wait в качестве сигнала. Правильный сигнал, такой как AutoResetEvent, имеет состояние, при котором он остается сигнальным, пока поток не вызовет WaitOne (). Вызов Pulse без каких-либо ожидающих потоков станет пустяком.

Это сочетается с тем, что один и тот же поток может многократно блокировать блокировку. Поскольку вы используете асинхронное программирование, обратный вызов Accept может быть вызван тем же потоком, что и BeginAcceptTcpClient.

Позвольте мне проиллюстрировать. Я закомментировал второй сервер и изменил код на вашем сервере.

void ThreadStart()
{
    if (!running)
    {
        listener.Start();
        running = true;
        lock (sync)
        {
            while (running)
            {
                try
                {
                    Console.WriteLine("BeginAccept [{0}]", 
                        Thread.CurrentThread.ManagedThreadId);
                    listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener);
                    Console.WriteLine("Wait [{0}]", 
                        Thread.CurrentThread.ManagedThreadId);
                    Monitor.Wait(sync);  // Release lock and wait for a pulse
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.Message);
                }
            }
        }
    }
}

void Accept(IAsyncResult result)
{
    // Let the server continue listening
    lock (sync)
    {
        Console.WriteLine("Pulse [{0}]", 
            Thread.CurrentThread.ManagedThreadId);
        Monitor.Pulse(sync);
    }
    if (running)
    {
        TcpListener localListener = (TcpListener)result.AsyncState;
        using (TcpClient client = localListener.EndAcceptTcpClient(result))
        {
            handler.Handle(client.GetStream());
        }
    }
}

Результат моего бега показан ниже. Если вы запустите этот код самостоятельно, значения будут отличаться, но в целом он будет таким же.

Press return to test...
BeginAccept [3]
Wait [3]

Press return to terminate...
Pulse [5]
BeginAccept [3]
Pulse [3]
Echo Handler: Test1
Echo Handler: Test3
Wait [3]

Как вы можете видеть, есть два вызванных импульса, один из отдельного потока (импульс [5]), который пробуждает первое ожидание. Затем поток 3 выполняет другой BeginAccept, но, имея ожидающие входящие соединения, этот поток решает немедленно вызвать обратный вызов Accept. Поскольку Accept вызывается тем же потоком, Lock (синхронизация) не блокируется, но Pulse [3] немедленно в пустой очереди потока.

Вызываются два обработчика и обрабатывают два сообщения.

Все в порядке, и ThreadStart снова начинает работать и переходит в состояние ожидания на неопределенное время.

Теперь основная проблема заключается в том, что вы пытаетесь использовать монитор в качестве сигнала. Поскольку он не запоминает состояние, второй импульс получает потерян.

Но для этого есть простое решение. Используйте AutoResetEvents, который является правильным сигналом, и он запомнит свое состояние.

public Server(IHandler handler, int port)
{
    this.handler = handler;
    IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0];
    listener = new TcpListener(address, port);
    running = false;
    _event = new AutoResetEvent(false);
}

public void Start()
{
    Thread thread = new Thread(ThreadStart);
    thread.Start();
}

public void Stop()
{
    listener.Stop();
    running = false;
    _event.Set();
}

void ThreadStart()
{
    if (!running)
    {
        listener.Start();
        running = true;
        while (running)
        {
            try
            {
                listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener);
                _event.WaitOne();
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }
    }
}

void Accept(IAsyncResult result)
{
    // Let the server continue listening
    _event.Set();
    if (running)
    {
        TcpListener localListener = (TcpListener) result.AsyncState;
        using (TcpClient client = localListener.EndAcceptTcpClient(result))
        {
            handler.Handle(client.GetStream());
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...