.NET вопрос об асинхронных операциях с сокетами и формировании сообщений - PullRequest
9 голосов
/ 19 мая 2011

Я всюду искал примеры того, как бороться с кадрированием TCP-сообщений.Я вижу много примеров, когда NetworkStreams передаются в объект StreamReader или StreamWriter и затем используют методы ReadLine или WriteLine для сообщений с разделителями '\ n'.Мой протокол приложения содержит сообщения, оканчивающиеся на '\ n', поэтому NetworkStream, похоже, является подходящим вариантом.Тем не менее, я не могу найти конкретных примеров того, как правильно обрабатывать все это в сочетании с асинхронными сокетами.Когда ReceiveCallback () вызывается ниже, как мне реализовать классы NetworkStream и StreamReader для работы с кадрированием сообщений?Согласно тому, что я прочитал, я могу получить часть одного сообщения за один прием, а остальную часть сообщения (включая \ n) в следующем приеме.Означает ли это, что я могу получить конец одного сообщения и часть следующего сообщения?Конечно, должен быть более простой способ справиться с этим.

У меня есть следующий код:

    private void StartRead(Socket socket)
    {
        try
        {
            StateObject state = new StateObject();
            state.AsyncSocket = socket;

            socket.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state);
        }
        catch (SocketException)
        {
            m_Socket.Shutdown(SocketShutdown.Both);
            Disconnect();
        }
    }

    private void ReceiveCallback(IAsyncResult ar)
    {
        try
        {
            StateObject state = (StateObject)ar.AsyncState;

            int bytes_read = state.AsyncSocket.EndReceive(ar);

            char[] chars = new char[bytes_read + 1];
            System.Text.Decoder decoder = System.Text.Encoding.UTF8.GetDecoder();
            int charLength = decoder.GetChars(state.Buffer, 0, bytes_read, chars, 0);

            String data = new String(chars);

            ParseMessage(data);

            StartRead(state.AsyncSocket);
        }
        catch (SocketException)
        {
            m_Socket.Shutdown(SocketShutdown.Both);
            Disconnect();
        }
    }

Ответы [ 3 ]

3 голосов
/ 19 мая 2011

Префикс кусков с длиной лучше, чем использование символа разделителя. Вам не нужно иметь дело с каким-либо видом экранирования, чтобы отправлять данные с новой строки таким образом.

Этот ответ может быть не актуален для вас сейчас, потому что он использует функции из AsyncCTP , которые будут только в следующей версии .net. Тем не менее, это делает вещи намного более краткими. По сути, вы пишете именно тот код, который вы делаете для синхронного случая, но вставляете операторы «await» там, где есть асинхронные вызовы.

    public static async Task<Byte[]> ReadChunkAsync(this Stream me) {
        var size = BitConverter.ToUInt32(await me.ReadExactAsync(4), 0);
        checked {
            return await me.ReadExactAsync((int)size);
        }
    }

    public static async Task<Byte[]> ReadExactAsync(this Stream me, int count) {
        var buf = new byte[count];
        var t = 0;
        while (t < count) {
            var n = await me.ReadAsync(buf, t, count - t);
            if (n <= 0) {
                if (t > 0) throw new IOException("End of stream (fragmented)");
                throw new IOException("End of stream");
            }
            t += n;
        }
        return buf;
    }

    public static void WriteChunk(this Stream me, byte[] buffer, int offset, int count) {
        me.Write(BitConverter.GetBytes(count), 0, 4);
        me.Write(buffer, offset, count);
    }
1 голос
/ 19 мая 2011

Обычно вы создаете буфер и каждый раз, когда получаете данные, добавляете эти данные в буфер и определяете, получили ли вы одно или несколько полных сообщений.

Между ReceiveCallback и StartReadвы не будете получать никаких асинхронных сообщений (входящие данные будут автоматически буферизироваться на уровне сокетов), поэтому это идеальное место для проверки полных сообщений и удаления их из буфера.

Возможны все варианты, включая получениеконец сообщения 1, плюс сообщение 2, плюс начало сообщения 3, все в одном фрагменте.

Я не рекомендую UTF8-декодирование фрагмента, так как один символ UTF8 может состоять из двух байтов,и если они разбиты на части, ваши данные могут быть повреждены.В этом случае вы можете сохранить байт [] - буфер (MemoryStream?) И разбить сообщения на байт 0x0A.

0 голосов
/ 20 мая 2011

Хорошо, вот что я в итоге сделал. Я создал поток чтения, который создает NetworkStream и StreamReader на основе сетевого потока. Затем я использую StreamReader.ReadLine для чтения строк таким образом. Это синхронный вызов, но он находится в своем собственном потоке. Кажется, работает намного лучше. Я должен был реализовать это, так как это наш протокол для приложения (сообщения с разделителями новой строки). Я знаю, что другие люди будут чертовски искать ответ, как я, вот соответствующий код чтения в моем классе Клиента:

public class Client
{
    Socket              m_Socket;

    EventWaitHandle     m_WaitHandle;
    readonly object     m_Locker;
    Queue<IEvent>       m_Tasks;
    Thread              m_Thread;

    Thread              m_ReadThread;

    public Client()
    {
        m_WaitHandle = new AutoResetEvent(false);
        m_Locker = new object();
        m_Tasks = new Queue<IEvent>();

        m_Thread = new Thread(Run);
        m_Thread.IsBackground = true;
        m_Thread.Start();
    }

    public void EnqueueTask(IEvent task)
    {
        lock (m_Locker)
        {
            m_Tasks.Enqueue(task);
        }

        m_WaitHandle.Set();
    }

    private void Run()
    {
        while (true)
        {
            IEvent task = null;

            lock (m_Locker)
            {
                if (m_Tasks.Count > 0)
                {
                    task = m_Tasks.Dequeue();

                    if (task == null)
                    {
                        return;
                    }
                }
            }

            if (task != null)
            {
                task.DoTask(this);
            }
            else
            {
                m_WaitHandle.WaitOne();
            }
        }
    }

    public void Connect(string hostname, int port)
    {
        try
        {
            m_Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

            IPAddress[] IPs = Dns.GetHostAddresses(hostname);

            m_Socket.BeginConnect(IPs, port, new AsyncCallback(ConnectCallback), m_Socket);
        }
        catch (SocketException)
        {
            m_Socket.Close();
            OnConnect(false, "Unable to connect to server.");
        }
    }

    private void ConnectCallback(IAsyncResult ar)
    {
        try
        {
            Socket socket = (Socket)ar.AsyncState;

            socket.EndConnect(ar);

            OnConnect(true, "Successfully connected to server.");

            m_ReadThread = new Thread(new ThreadStart(this.ReadThread));
            m_ReadThread.Name = "Read Thread";
            m_ReadThread.IsBackground = true;
            m_ReadThread.Start();
        }
        catch (SocketException)
        {
            m_Socket.Close();
            OnConnect(false, "Unable to connect to server.");
        }
    }

    void ReadThread()
    {
        NetworkStream networkStream = new NetworkStream(m_Socket);
        StreamReader reader = new StreamReader(networkStream);

        while (true)
        {
            try
            {
                String message = reader.ReadLine();

                // To keep the code thread-safe, enqueue a task in the CLient class thread to parse the message received.
                EnqueueTask(new ServerMessageEvent(message));
            }
            catch (IOException)
            {
                // The code will reach here if the server disconnects from the client. Make sure to cleanly shutdown...
                Disconnect();
                break;
            }
        }
    }

    ... Code for sending/parsing the message in the Client class thread.
}
...