Существуют ли хорошо известные шаблоны для асинхронного сетевого кода в C #? - PullRequest
6 голосов
/ 15 марта 2009

Я недавно написал простой и понятный прокси-сервер для проверки концепции на C # в рамках попытки заставить веб-приложение Java взаимодействовать с устаревшим приложением VB6, расположенным на другом сервере. Это смехотворно просто:

Прокси-сервер и клиенты используют один и тот же формат сообщения; в коде я использую класс ProxyMessage для представления как запросов от клиентов, так и ответов, генерируемых сервером:

public class ProxyMessage
{
   int Length; // message length (not including the length bytes themselves)
   string Body; // an XML string containing a request/response

   // writes this message instance in the proper network format to stream 
   // (helper for response messages)
   WriteToStream(Stream stream) { ... }
}

Сообщения настолько просты, насколько это возможно: длина тела + тело сообщения.

У меня есть отдельный класс ProxyClient, представляющий соединение с клиентом. Он обрабатывает все взаимодействие между прокси и одним клиентом.

Что мне интересно, так это то, что они представляют собой шаблоны проектирования или передовые методы для упрощения стандартного кода, связанного с асинхронным программированием сокетов? Например, вам необходимо позаботиться о том, чтобы управлять буфером чтения, чтобы случайно не потерять байты и отслеживать, как далеко вы продвинулись в обработке текущего сообщения. В моем текущем коде я выполняю всю эту работу в своей функции обратного вызова для TcpClient.BeginRead и управляю состоянием буфера и текущим состоянием обработки сообщений с помощью нескольких переменных экземпляра.

Ниже приведен код моей функции обратного вызова, которую я передаю BeginRead, вместе с соответствующими переменными экземпляра для контекста. Кажется, код работает нормально «как есть», но мне интересно, можно ли его немного реорганизовать, чтобы сделать его более понятным (или, может быть, он уже есть?).

private enum BufferStates 
{ 
    GetMessageLength, 
    GetMessageBody 
}
// The read buffer. Initially 4 bytes because we are initially
// waiting to receive the message length (a 32-bit int) from the client 
// on first connecting. By constraining the buffer length to exactly 4 bytes,
// we make the buffer management a bit simpler, because
// we don't have to worry about cases where the buffer might contain
// the message length plus a few bytes of the message body.
// Additional bytes will simply be buffered by the OS until we request them.
byte[] _buffer = new byte[4];

// A count of how many bytes read so far in a particular BufferState.
int _totalBytesRead = 0;

// The state of the our buffer processing. Initially, we want
// to read in the message length, as it's the first thing
// a client will send
BufferStates _bufferState = BufferStates.GetMessageLength;

// ...ADDITIONAL CODE OMITTED FOR BREVITY...

// This is called every time we receive data from
// the client.

private void ReadCallback(IAsyncResult ar)
{
    try
    {
        int bytesRead = _tcpClient.GetStream().EndRead(ar);

        if (bytesRead == 0)
        {
            // No more data/socket was closed.
            this.Dispose();
            return;
        }

        // The state passed to BeginRead is used to hold a ProxyMessage
        // instance that we use to build to up the message 
        // as it arrives.
        ProxyMessage message = (ProxyMessage)ar.AsyncState;

        if(message == null)
            message = new ProxyMessage();

        switch (_bufferState)
        {
            case BufferStates.GetMessageLength:

                _totalBytesRead += bytesRead;

                // if we have the message length (a 32-bit int)
                // read it in from the buffer, grow the buffer
                // to fit the incoming message, and change
                // state so that the next read will start appending
                // bytes to the message body

                if (_totalBytesRead == 4)
                {
                    int length = BitConverter.ToInt32(_buffer, 0);
                    message.Length = length;
                    _totalBytesRead = 0;
                    _buffer = new byte[message.Length];
                    _bufferState = BufferStates.GetMessageBody;
                }

                break;

            case BufferStates.GetMessageBody:

                string bodySegment = Encoding.ASCII.GetString(_buffer, _totalBytesRead, bytesRead);
                _totalBytesRead += bytesRead;

                message.Body += bodySegment;

                if (_totalBytesRead >= message.Length)
                {
                    // Got a complete message.
                    // Notify anyone interested.

                    // Pass a response ProxyMessage object to 
                    // with the event so that receivers of OnReceiveMessage
                    // can send a response back to the client after processing
                    // the request.
                    ProxyMessage response = new ProxyMessage();
                    OnReceiveMessage(this, new ProxyMessageEventArgs(message, response));
                    // Send the response to the client
                    response.WriteToStream(_tcpClient.GetStream());

                    // Re-initialize our state so that we're
                    // ready to receive additional requests...
                    message = new ProxyMessage();
                    _totalBytesRead = 0;
                    _buffer = new byte[4]; //message length is 32-bit int (4 bytes)
                    _bufferState = BufferStates.GetMessageLength;
                }

                break;
        }

        // Wait for more data...
        _tcpClient.GetStream().BeginRead(_buffer, 0, _buffer.Length, this.ReadCallback, message);
    }
    catch
    {
        // do nothing
    }

}

Пока что моя единственная реальная мысль - это выделить связанные с буфером вещи в отдельный класс MessageBuffer и просто сделать так, чтобы мой обратный вызов чтения добавил к нему новые байты по мере их поступления. Затем MessageBuffer будет беспокоиться о таких вещах, как текущий BufferState, и вызовет событие, когда получит полное сообщение, которое ProxyClient сможет затем распространить дальше до кода основного прокси-сервера, где запрос может быть обработан.

Ответы [ 4 ]

2 голосов
/ 15 марта 2009

Мне пришлось преодолеть подобные проблемы. Вот мое решение (измененное в соответствии с вашим собственным примером).

Мы создаем оболочку вокруг Stream (суперкласс NetworkStream, который является суперклассом TcpClient или чем-то еще) Мониторы читает. Когда некоторые данные читаются, они буферизируются. Когда мы получаем индикатор длины (4 байта), мы проверяем, есть ли у нас полное сообщение (4 байта + длина тела сообщения). Когда мы это делаем, мы генерируем событие MessageReceived с телом сообщения и удаляем сообщение из буфера. Этот метод автоматически обрабатывает фрагментированные сообщения и ситуации с несколькими сообщениями на пакет.

public class MessageStream : IMessageStream, IDisposable
{
    public MessageStream(Stream stream)
    {
        if(stream == null)
            throw new ArgumentNullException("stream", "Stream must not be null");

        if(!stream.CanWrite || !stream.CanRead)
            throw new ArgumentException("Stream must be readable and writable", "stream");

        this.Stream = stream;
        this.readBuffer = new byte[512];
        messageBuffer = new List<byte>();
        stream.BeginRead(readBuffer, 0, readBuffer.Length, new AsyncCallback(ReadCallback), null);
    }

    // These belong to the ReadCallback thread only.
    private byte[] readBuffer;
    private List<byte> messageBuffer;

    private void ReadCallback(IAsyncResult result)
    {
        int bytesRead = Stream.EndRead(result);
        messageBuffer.AddRange(readBuffer.Take(bytesRead));

        if(messageBuffer.Count >= 4)
        {
            int length = BitConverter.ToInt32(messageBuffer.Take(4).ToArray(), 0);  // 4 bytes per int32

            // Keep buffering until we get a full message.

            if(messageBuffer.Count >= length + 4)
            {
                messageBuffer.Skip(4);
                OnMessageReceived(new MessageEventArgs(messageBuffer.Take(length)));
                messageBuffer.Skip(length);
            }
        }

        // FIXME below is kinda hacky (I don't know the proper way of doing things...)

        // Don't bother reading again.  We don't have stream access.
        if(disposed)
            return;

        try
        {
            Stream.BeginRead(readBuffer, 0, readBuffer.Length, new AsyncCallback(ReadCallback), null);
        }
        catch(ObjectDisposedException)
        {
            // DO NOTHING
            // Ends read loop.
        }
    }

    public Stream Stream
    {
        get;
        private set;
    }

    public event EventHandler<MessageEventArgs> MessageReceived;

    protected virtual void OnMessageReceived(MessageEventArgs e)
    {
        var messageReceived = MessageReceived;

        if(messageReceived != null)
            messageReceived(this, e);
    }

    public virtual void SendMessage(Message message)
    {
        // Have fun ...
    }

    // Dispose stuff here
}
1 голос
/ 16 марта 2009

Нет ничего плохого в том, как ты это сделал. Для меня, однако, мне нравится отделять получение данных от их обработки, о чем вы, кажется, думаете с вашим предложенным классом MessageBuffer. Я обсуждал это подробно здесь .

1 голос
/ 15 марта 2009

Вы можете использовать yield return для автоматизации генерации конечного автомата для асинхронных обратных вызовов. Джеффри Рихтер продвигает эту технику в своем классе AsyncEnumerator , и я поиграл с идеей здесь .

1 голос
/ 15 марта 2009

Я думаю, что дизайн, который вы использовали, хорош, примерно так, как я бы и сделал то же самое. Я не думаю, что вы бы много выиграли, если бы рефакторинг превратился в дополнительные классы / структуры, и из того, что я видел, вы бы на самом деле сделали решение более сложным.

Единственный комментарий, который я хотел бы сказать, относительно того, достаточно ли устойчивы два чтения, где первое всегда имеет длину мессы, а второе всегда является телом. Я всегда настороженно отношусь к таким подходам, так как если они каким-то образом не синхронизируются из-за непредвиденных обстоятельств (например, когда другой конец посылает неправильную длину), восстановить очень сложно. Вместо этого я сделал бы одно чтение с большим буфером, чтобы я всегда получал все доступные данные из сети, а затем проверял буфер, чтобы извлечь полные сообщения. Таким образом, если что-то пойдет не так, текущий буфер можно просто выбросить, чтобы вернуть все в чистое состояние, и будут потеряны только текущие сообщения, а не остановка всей службы.

На самом деле в данный момент у вас возникнет проблема, если ваше тело сообщения будет большим и получено за два отдельных приема, а следующее сообщение в строке отправит его длину одновременно со второй половиной предыдущего тела. Если это произойдет, длина вашего сообщения в конечном итоге будет добавлена ​​к телу предыдущего сообщения, и вы окажетесь в ситуации, описанной в предыдущем абзаце.

...