Поддерживают ли Socket.SendToAsyn c и Socket.ReceiveFromAsyn c протокол TCP? - PullRequest
0 голосов
/ 02 марта 2020

Редактировать: Большое спасибо @oleksa, но были и другие проблемы в коде, размещенном ниже. При использовании методов Socket.ReceiveFromAsyn c и Socket.SendToAsyn c передаваемый объект SocketAsyncEventArgs ДОЛЖЕН иметь свое свойство RemoteEndPoint (предпочтительно для RemoteEndPoint клиентского сокета). Несмотря на то, что в документах указано, что это свойство игнорируется, оно все равно должно быть установлено, иначе возникнет исключительная ситуация.

Согласно документации Microsoft, можно использовать методы socket.SendToAsyn c и socket.ReceiveFromAsyn c. с «протоколами, ориентированными на соединение» ( SendToAsyn c Docs ) и «сокеты в стиле байтового потока» ( ReceiveFromAsyn c Docs ) соответственно.

My вопрос в том, поддерживают ли эти методы TCP, и если да, то какие предварительные условия необходимы для начала отправки и получения данных?

Я добавил примеры кода, чтобы лучше объяснить свою проблему. Я не могу заставить обработчик клиента отображать байты, полученные от клиента. При использовании синхронных методов SendTo и принятого клиентского сокета это работает без проблем, и я могу отображать байты. При использовании асинхронных версий (которые работают безупречно с UDP) клиентский обработчик никогда не получает никаких байтов. Это поведение аналогично при использовании сокета прослушивателя сервера или принятого сокета клиента (из SocketOperations.AcceptAsyn c).

Код:

        public async Task ClientHandler(object clientArgsObj)
        {
            SocketAsyncEventArgs clientArgs = (SocketAsyncEventArgs)clientArgsObj;

            byte[] receiveBuffer = new byte[Constants.PacketSize];
            Memory<byte> receiveBufferMemory = new Memory<byte>(receiveBuffer);

            Socket clientSocket = clientArgs.AcceptSocket;
            EndPoint remoteEndPoint = clientArgs.AcceptSocket.RemoteEndPoint;

            while (true)
            {
                ReceiveResult result = await ReceiveAsync(clientArgs, SocketFlags.None, receiveBufferMemory);

                Console.WriteLine($"[{result.RemoteEndPoint} > Server] : {Encoding.UTF8.GetString(result.Contents.Span)}");

                int sentBytes = await SendAsync(result.ClientArgs, SocketFlags.None, receiveBufferMemory);

                Console.WriteLine($"[Server > {result.RemoteEndPoint}] Sent {sentBytes} bytes to {result.RemoteEndPoint}");

                // This bottom stuff works just fine
                //int receivedBytes = clientSocket.ReceiveFrom(receiveBuffer, SocketFlags.None, ref remoteEndPoint);
                //int sentBytes = clientSocket.SendTo(receiveBuffer, receivedBytes, SocketFlags.None, remoteEndPoint);
            }
        }

        /// <inheritdoc />
        public override async Task StartAsync()
        {
            socket.Listen(5);

            while (true)
            {
                SocketAsyncEventArgs clientArgs = await SocketOperations.AcceptAsync(socket);

                await Task.Factory.StartNew(ClientHandler, clientArgs, CancellationToken.None,
                               TaskCreationOptions.LongRunning, TaskScheduler.Default);
            }
        }

        private Task<ReceiveResult> ReceiveAsync(SocketAsyncEventArgs args, SocketFlags socketFlags, Memory<byte> outputBuffer, CancellationToken cancellationToken = default)
        {
            return SocketOperations.ReceiveAsync(args.AcceptSocket, args, socketFlags, outputBuffer, cancellationToken);
            //return SocketOperations.ReceiveAsync(socket, args, socketFlags, outputBuffer, cancellationToken);
        }

        private Task<int> SendAsync(SocketAsyncEventArgs args, SocketFlags socketFlags,
            Memory<byte> inputBuffer, CancellationToken cancellationToken = default)
        {
            return SocketOperations.SendAsync(args.AcceptSocket, args, socketFlags, inputBuffer, cancellationToken);
            //return SocketOperations.SendAsync(socket, args, socketFlags, inputBuffer, cancellationToken);
        }

public sealed class AsyncSocketOperations
    {
        private const int MaximumPooledObjects = 10;

        private readonly ObjectPool<SocketAsyncEventArgs> connectAsyncArgsPool;
        private readonly ArrayPool<byte> receiveBufferPool;
        private readonly ArrayPool<byte> receiveFromBufferPool;
        private readonly ArrayPool<byte> sendBufferPool;
        private readonly ArrayPool<byte> sendToBufferPool;
        private readonly ObjectPool<SocketAsyncEventArgs> socketAsyncArgsPool;

        private void HandleIOCompleted(object? sender, SocketAsyncEventArgs eventArgs)
        {
            bool closed = false;

            switch (eventArgs.LastOperation)
            {
                case SocketAsyncOperation.SendTo:
                    AsyncWriteToken asyncSendToToken = (AsyncWriteToken)eventArgs.UserToken;

                    if (asyncSendToToken.CancellationToken.IsCancellationRequested)
                    {
                        asyncSendToToken.CompletionSource.SetCanceled();
                    }
                    else
                    {
                        if (eventArgs.SocketError != SocketError.Success)
                        {
                            asyncSendToToken.CompletionSource.SetException(
                                new SocketException((int)eventArgs.SocketError));
                        }
                        else
                        {
                            asyncSendToToken.CompletionSource.SetResult(eventArgs.BytesTransferred);
                        }
                    }

                    sendToBufferPool.Return(asyncSendToToken.RentedBuffer, true);

                    break;

                case SocketAsyncOperation.ReceiveFrom:
                    AsyncReadToken asyncReceiveFromToken = (AsyncReadToken)eventArgs.UserToken;

                    if (asyncReceiveFromToken.CancellationToken.IsCancellationRequested)
                    {
                        asyncReceiveFromToken.CompletionSource.SetCanceled();
                    }
                    else
                    {
                        if (eventArgs.SocketError != SocketError.Success)
                        {
                            asyncReceiveFromToken.CompletionSource.SetException(
                                new SocketException((int)eventArgs.SocketError));
                        }
                        else
                        {
                            eventArgs.MemoryBuffer.CopyTo(asyncReceiveFromToken.UserBuffer);
                            ReceiveResult result = new ReceiveResult(eventArgs, asyncReceiveFromToken.UserBuffer,
                                eventArgs.BytesTransferred, eventArgs.RemoteEndPoint);

                            asyncReceiveFromToken.CompletionSource.SetResult(result);
                        }
                    }

                    receiveFromBufferPool.Return(asyncReceiveFromToken.RentedBuffer, true);

                    break;

                case SocketAsyncOperation.Disconnect:
                    closed = true;
                    break;

                case SocketAsyncOperation.Accept:
                    AsyncAcceptToken asyncAcceptToken = (AsyncAcceptToken)eventArgs.UserToken;

                    if (asyncAcceptToken.CancellationToken.IsCancellationRequested)
                    {
                        asyncAcceptToken.CompletionSource.SetCanceled();
                    }
                    else
                    {
                        if (eventArgs.SocketError != SocketError.Success)
                        {
                            asyncAcceptToken.CompletionSource.SetException(
                                new SocketException((int)eventArgs.SocketError));
                        }
                        else
                        {
                            asyncAcceptToken.CompletionSource.SetResult(eventArgs);
                        }
                    }

                    connectAsyncArgsPool.Return(eventArgs);

                    break;

                case SocketAsyncOperation.Connect:
                case SocketAsyncOperation.ReceiveMessageFrom:
                case SocketAsyncOperation.SendPackets:
                case SocketAsyncOperation.None:
                    throw new NotImplementedException();

                default:
                    throw new ArgumentOutOfRangeException();
            }

            if (closed)
            {
                // handle the client closing the connection on tcp servers at some point
            }
        }

        private readonly struct AsyncAcceptToken
        {
            public readonly CancellationToken CancellationToken;
            public readonly TaskCompletionSource<SocketAsyncEventArgs> CompletionSource;

            public AsyncAcceptToken(TaskCompletionSource<SocketAsyncEventArgs> tcs, CancellationToken cancellationToken = default)
            {
                CompletionSource = tcs;
                CancellationToken = cancellationToken;
            }
        }

        private readonly struct AsyncReadToken
        {
            public readonly CancellationToken CancellationToken;
            public readonly TaskCompletionSource<ReceiveResult> CompletionSource;
            public readonly byte[] RentedBuffer;
            public readonly Memory<byte> UserBuffer;

            public AsyncReadToken(byte[] rentedBuffer, Memory<byte> userBuffer, TaskCompletionSource<ReceiveResult> tcs,
                CancellationToken cancellationToken = default)
            {
                RentedBuffer = rentedBuffer;
                UserBuffer = userBuffer;

                CompletionSource = tcs;
                CancellationToken = cancellationToken;
            }
        }

        private readonly struct AsyncWriteToken
        {
            public readonly CancellationToken CancellationToken;
            public readonly TaskCompletionSource<int> CompletionSource;
            public readonly byte[] RentedBuffer;

            public AsyncWriteToken(byte[] rentedBuffer, TaskCompletionSource<int> tcs,
                CancellationToken cancellationToken = default)
            {
                RentedBuffer = rentedBuffer;

                CompletionSource = tcs;
                CancellationToken = cancellationToken;
            }
        }

        public AsyncSocketOperations(int bufferSize, int maxPooledObjectCount = MaximumPooledObjects, bool preallocateBuffers = false)
        {
            MaxPooledObjects = maxPooledObjectCount;
            BufferSize = bufferSize;

            sendBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);
            receiveBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);

            sendToBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);
            receiveFromBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);

            connectAsyncArgsPool = new LeakTrackingObjectPool<SocketAsyncEventArgs>(
                new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(),
                    maxPooledObjectCount));

            socketAsyncArgsPool = new LeakTrackingObjectPool<SocketAsyncEventArgs>(
                new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(),
                    maxPooledObjectCount));

            for (int i = 0; i < MaxPooledObjects; i++)
            {
                SocketAsyncEventArgs connectArgs = new SocketAsyncEventArgs();
                connectArgs.Completed += HandleIOCompleted;
                connectAsyncArgsPool.Return(connectArgs);

                SocketAsyncEventArgs socketArgs = new SocketAsyncEventArgs();
                socketArgs.Completed += HandleIOCompleted;
                socketAsyncArgsPool.Return(socketArgs);
            }

            if (preallocateBuffers)
            {
                // TODO: Allocate and return array pool buffers
            }
        }

        public int BufferSize { get; }

        public int MaxPooledObjects { get; }

        public Task<SocketAsyncEventArgs> AcceptAsync(Socket socket, CancellationToken cancellationToken = default)
        {
            TaskCompletionSource<SocketAsyncEventArgs> tcs = new TaskCompletionSource<SocketAsyncEventArgs>();

            SocketAsyncEventArgs args = connectAsyncArgsPool.Get();

            args.AcceptSocket = null;
            args.UserToken = new AsyncAcceptToken(tcs, cancellationToken);

            if (socket.AcceptAsync(args)) return tcs.Task;

            return Task.FromResult(args);
        }

        public Task<ReceiveResult> ReceiveFromAsync(Socket socket, SocketAsyncEventArgs args, SocketFlags socketFlags,
            Memory<byte> outputBuffer, CancellationToken cancellationToken = default)
        {
            TaskCompletionSource<ReceiveResult> tcs = new TaskCompletionSource<ReceiveResult>();

            byte[] rentedReceiveFromBuffer = receiveFromBufferPool.Rent(BufferSize);
            Memory<byte> rentedReceiveFromBufferMemory = new Memory<byte>(rentedReceiveFromBuffer);

            args.SetBuffer(rentedReceiveFromBufferMemory);
            args.SocketFlags = socketFlags;
            args.UserToken = new AsyncReadToken(rentedReceiveFromBuffer, outputBuffer, tcs, cancellationToken);

            // if the receive operation doesn't complete synchronously, returns the awaitable task
            if (socket.ReceiveFromAsync(args)) return tcs.Task;

            args.MemoryBuffer.CopyTo(outputBuffer);

            ReceiveResult result = new ReceiveResult(args, outputBuffer, args.BytesTransferred, args.RemoteEndPoint);

            receiveFromBufferPool.Return(rentedReceiveFromBuffer, true);

            return Task.FromResult(result);
        }

        public Task<int> SendToAsync(Socket socket, SocketAsyncEventArgs args, SocketFlags socketFlags,
            Memory<byte> inputBuffer, CancellationToken cancellationToken = default)
        {
            TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();

            byte[] rentedSendToBuffer = sendToBufferPool.Rent(BufferSize);
            Memory<byte> rentedSendToBufferMemory = new Memory<byte>(rentedSendToBuffer);

            inputBuffer.CopyTo(rentedSendToBufferMemory);

            args.SetBuffer(rentedSendToBufferMemory);
            args.SocketFlags = socketFlags;
            args.UserToken = new AsyncWriteToken(rentedSendToBuffer, tcs, cancellationToken);

            // if the send operation doesn't complete synchronously, return the awaitable task
            if (socket.SendToAsync(args)) return tcs.Task;

            int result = args.BytesTransferred;

            sendToBufferPool.Return(rentedSendToBuffer, true);

            return Task.FromResult(result);
        }

1 Ответ

1 голос
/ 02 марта 2020

Я считаю, что оба метода SendToAsync и ReceiveFromAsync являются просто неблокирующими копиями методов SendTo и ReceiveFrom

SendTo do c имеет образец с ProtocolType.Udp но вы также можете использовать ProtocolType.Tcp.

Вы должны запустить TCP-сервер с ReceiveFromAsync, а затем отправить данные с помощью SendToAsync. Пожалуйста, проверьте образцы MS SendTo и ReceiveFrom и обновите их до версии asyn c соответственно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...