Прокси-сообщения WebSocket между двумя потоками - PullRequest
1 голос
/ 01 мая 2019

У меня есть прокси-сервер HTTP, который действует как посредник.Это в основном делает следующее:

  • Прослушивание запроса клиента-браузера
  • Пересылка запроса на сервер
  • Разбор ответа сервера
  • Пересылкаответ обратно на клиент-браузер

Таким образом, в основном есть один NetworkStream, или даже чаще SslStream между клиентом-браузером и прокси, и еще один между прокси иserver.

Возникло также требование пересылать трафик WebSocket между клиентом и сервером.

Итак, теперь, когда клиент-браузер запрашивает обновление соединения до websocket, а удаленный сервер отвечает:HTTP-код 101, прокси-сервер поддерживает эти соединения для пересылки дальнейших сообщений от клиента к серверу и наоборот.

Таким образом, после того, как прокси-сервер получил сообщение от удаленного сервера о том, что он готов переключать протоколы, оннеобходимо войти в цикл, где клиентские и серверные потоки опрашиваются для данных, и где любые полученные данные пересылаются вдругая сторона.

Проблема

WebSocket позволяет обеим сторонам отправлять сообщения в любое время.Это особенно проблема с управляющими сообщениями, такими как пинг / понг, где любая сторона может отправить пинг в любое время, а другая сторона должна ответить понг своевременноманера.Теперь рассмотрим два экземпляра SslStream, у которых нет свойства DataAvailable, где единственный способ прочитать данные - вызвать Read / ReadAsync, который может не вернуться, пока не будут доступны некоторые данные.Рассмотрим следующий псевдокод:

public async Task GetMessage()
{
    // All these methods that we await read from the source stream
    byte[] firstByte = await GetFirstByte(); // 1-byte buffer
    byte[] messageLengthBytes = await GetMessageLengthBytes();
    uint messageLength = GetMessageLength(messageLengthBytes);
    bool isMessageMasked = DetermineIfMessageMasked(messageLengthBytes);
    byte[] maskBytes;
    if (isMessageMasked)
    {
        maskBytes = await GetMaskBytes();
    }

    byte[] messagePayload = await GetMessagePayload(messageLength);

    // This method writes to the destination stream
    await ComposeAndForwardMessageToOtherParty(firstByte, messageLengthBytes, maskBytes, messagePayload);
}

Приведенный выше псевдокод читает из одного потока и записывает в другой.Проблема заключается в том, что вышеуказанную процедуру нужно запускать для обоих потоков одновременно, потому что мы не знаем, какая сторона отправит сообщение другой в любой данный момент времени.Однако невозможно выполнить операцию записи, пока активна операция чтения.А поскольку у нас нет средств, необходимых для опроса входящих данных, операции чтения должны блокироваться.Это означает, что если мы начнем операции чтения для обоих потоков одновременно, мы можем забыть о записи в них.Один поток в конечном итоге вернет некоторые данные, но мы не сможем отправить эти данные в другой поток, так как он все еще будет занят попыткой чтения.И это может занять некоторое время, по крайней мере, пока сторона, владеющая этим потоком, не отправит запрос ping .

1 Ответ

0 голосов
/ 03 мая 2019

Благодаря комментариям @MarcGravell мы узнали, что независимые операции чтения / записи поддерживаются сетевыми потоками, т.е. NetworkStream действует как два независимых канала - одно чтение, одна запись - полностью дуплекс.

Таким образом, проксирование сообщений WebSocket может быть таким же простым, как запуск двух независимых задач: одна для чтения из потока клиента и записи в поток сервера, а другая для чтения из потока сервера и записи в поток клиента.

Если это может помочь любому, кто ищет его, вот как я это реализовал:

public class WebSocketRequestHandler
{
    private const int MaxMessageLength = 0x7FFFFFFF;

    private const byte LengthBitMask = 0x7F;

    private const byte MaskBitMask = 0x80;

    private delegate Task WriteStreamAsyncDelegate(byte[] buffer, int offset, int count, CancellationToken cancellationToken);

    private delegate Task<byte[]> BufferStreamAsyncDelegate(int count, CancellationToken cancellationToken);

    public async Task HandleWebSocketMessagesAsync(CancellationToken cancellationToken = default(CancellationToken))
    {
        var clientListener = ListenForClientMessages(cancellationToken);
        var serverListener = ListenForServerMessages(cancellationToken);
        await Task.WhenAll(clientListener, serverListener);
    }

    private async Task ListenForClientMessages(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            cancellationToken.ThrowIfCancellationRequested();
            await ListenForMessages(YOUR_CLIENT_STREAM_BUFFER_METHOD_DELEGATE, YOUR_SERVER_STREAM_WRITE_METHOD_DELEGATE, cancellationToken);
        }
    }

    private async Task ListenForServerMessages(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            cancellationToken.ThrowIfCancellationRequested();
            await ListenForMessages(YOUR_SERVER_STREAM_BUFFER_METHOD_DELEGATE, YOUR_CLIENT_STREAM_WRITE_METHOD_DELEGATE, cancellationToken);
        }
    }

    private static async Task ListenForMessages(BufferStreamAsyncDelegate sourceStreamReader,
        WriteStreamAsyncDelegate destinationStreamWriter,
        CancellationToken cancellationToken)
    {
        var messageBuilder = new List<byte>();
        var firstByte = await sourceStreamReader(1, cancellationToken);
        messageBuilder.AddRange(firstByte);
        var lengthBytes = await GetLengthBytes(sourceStreamReader, cancellationToken);
        messageBuilder.AddRange(lengthBytes);
        var isMaskBitSet = (lengthBytes[0] & MaskBitMask) != 0;
        var length = GetMessageLength(lengthBytes);
        if (isMaskBitSet)
        {
            var maskBytes = await sourceStreamReader(4, cancellationToken);
            messageBuilder.AddRange(maskBytes);
        }

        var messagePayloadBytes = await sourceStreamReader(length, cancellationToken);
        messageBuilder.AddRange(messagePayloadBytes);
        await destinationStreamWriter(messageBuilder.ToArray(), 0, messageBuilder.Count, cancellationToken);
    }

    private static async Task<byte[]> GetLengthBytes(BufferStreamAsyncDelegate sourceStreamReader, CancellationToken cancellationToken)
    {
        var lengthBytes = new List<byte>();
        var firstLengthByte = await sourceStreamReader(1, cancellationToken);
        lengthBytes.AddRange(firstLengthByte);
        var lengthByteValue = firstLengthByte[0] & LengthBitMask;
        if (lengthByteValue <= 125)
        {
            return lengthBytes.ToArray();
        }

        switch (lengthByteValue)
        {
            case 126:
            {
                var secondLengthBytes = await sourceStreamReader(2, cancellationToken);
                lengthBytes.AddRange(secondLengthBytes);
                return lengthBytes.ToArray();
            }
            case 127:
            {
                var secondLengthBytes = await sourceStreamReader(8, cancellationToken);
                lengthBytes.AddRange(secondLengthBytes);
                return lengthBytes.ToArray();
            }
            default:
                throw new Exception($"Unexpected first length byte value: {lengthByteValue}");
        }
    }

    private static int GetMessageLength(byte[] lengthBytes)
    {
        byte[] subArray;
        switch (lengthBytes.Length)
        {
            case 1:
                return lengthBytes[0] & LengthBitMask;

            case 3:
                if (!BitConverter.IsLittleEndian)
                {
                    return BitConverter.ToUInt16(lengthBytes, 1);
                }

                subArray = lengthBytes.SubArray(1, 2);
                Array.Reverse(subArray);
                return BitConverter.ToUInt16(subArray, 0);

            case 9:
                subArray = lengthBytes.SubArray(1, 8);
                Array.Reverse(subArray);
                var retVal = BitConverter.ToUInt64(subArray, 0);
                if (retVal > MaxMessageLength)
                {
                    throw new Exception($"Unexpected payload length: {retVal}");
                }

                return (int) retVal;

            default:
                throw new Exception($"Impossibru!!1 The length of lengthBytes array was: '{lengthBytes.Length}'");
        }
    }
}

Его можно использовать, просто позвонив await handler.HandleWebSocketMessagesAsync(cancellationToken) после того, как начальное рукопожатие было выполнено.

Метод SubArray взят отсюда: https://stackoverflow.com/a/943650/828023 (также из @Marc haha)

...