Редактировать: Большое спасибо @oleksa, но были и другие проблемы в коде, размещенном ниже. При использовании методов Socket.ReceiveFromAsyn c и Socket.SendToAsyn c передаваемый объект SocketAsyncEventArgs ДОЛЖЕН иметь свое свойство RemoteEndPoint (предпочтительно для RemoteEndPoint клиентского сокета). Несмотря на то, что в документах указано, что это свойство игнорируется, оно все равно должно быть установлено, иначе возникнет исключительная ситуация.
Согласно документации Microsoft, можно использовать методы socket.SendToAsyn c и socket.ReceiveFromAsyn c. с «протоколами, ориентированными на соединение» ( SendToAsyn c Docs ) и «сокеты в стиле байтового потока» ( ReceiveFromAsyn c Docs ) соответственно.
My вопрос в том, поддерживают ли эти методы TCP, и если да, то какие предварительные условия необходимы для начала отправки и получения данных?
Я добавил примеры кода, чтобы лучше объяснить свою проблему. Я не могу заставить обработчик клиента отображать байты, полученные от клиента. При использовании синхронных методов SendTo и принятого клиентского сокета это работает без проблем, и я могу отображать байты. При использовании асинхронных версий (которые работают безупречно с UDP) клиентский обработчик никогда не получает никаких байтов. Это поведение аналогично при использовании сокета прослушивателя сервера или принятого сокета клиента (из SocketOperations.AcceptAsyn c).
Код:
public async Task ClientHandler(object clientArgsObj)
{
SocketAsyncEventArgs clientArgs = (SocketAsyncEventArgs)clientArgsObj;
byte[] receiveBuffer = new byte[Constants.PacketSize];
Memory<byte> receiveBufferMemory = new Memory<byte>(receiveBuffer);
Socket clientSocket = clientArgs.AcceptSocket;
EndPoint remoteEndPoint = clientArgs.AcceptSocket.RemoteEndPoint;
while (true)
{
ReceiveResult result = await ReceiveAsync(clientArgs, SocketFlags.None, receiveBufferMemory);
Console.WriteLine($"[{result.RemoteEndPoint} > Server] : {Encoding.UTF8.GetString(result.Contents.Span)}");
int sentBytes = await SendAsync(result.ClientArgs, SocketFlags.None, receiveBufferMemory);
Console.WriteLine($"[Server > {result.RemoteEndPoint}] Sent {sentBytes} bytes to {result.RemoteEndPoint}");
// This bottom stuff works just fine
//int receivedBytes = clientSocket.ReceiveFrom(receiveBuffer, SocketFlags.None, ref remoteEndPoint);
//int sentBytes = clientSocket.SendTo(receiveBuffer, receivedBytes, SocketFlags.None, remoteEndPoint);
}
}
/// <inheritdoc />
public override async Task StartAsync()
{
socket.Listen(5);
while (true)
{
SocketAsyncEventArgs clientArgs = await SocketOperations.AcceptAsync(socket);
await Task.Factory.StartNew(ClientHandler, clientArgs, CancellationToken.None,
TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
}
private Task<ReceiveResult> ReceiveAsync(SocketAsyncEventArgs args, SocketFlags socketFlags, Memory<byte> outputBuffer, CancellationToken cancellationToken = default)
{
return SocketOperations.ReceiveAsync(args.AcceptSocket, args, socketFlags, outputBuffer, cancellationToken);
//return SocketOperations.ReceiveAsync(socket, args, socketFlags, outputBuffer, cancellationToken);
}
private Task<int> SendAsync(SocketAsyncEventArgs args, SocketFlags socketFlags,
Memory<byte> inputBuffer, CancellationToken cancellationToken = default)
{
return SocketOperations.SendAsync(args.AcceptSocket, args, socketFlags, inputBuffer, cancellationToken);
//return SocketOperations.SendAsync(socket, args, socketFlags, inputBuffer, cancellationToken);
}
public sealed class AsyncSocketOperations
{
private const int MaximumPooledObjects = 10;
private readonly ObjectPool<SocketAsyncEventArgs> connectAsyncArgsPool;
private readonly ArrayPool<byte> receiveBufferPool;
private readonly ArrayPool<byte> receiveFromBufferPool;
private readonly ArrayPool<byte> sendBufferPool;
private readonly ArrayPool<byte> sendToBufferPool;
private readonly ObjectPool<SocketAsyncEventArgs> socketAsyncArgsPool;
private void HandleIOCompleted(object? sender, SocketAsyncEventArgs eventArgs)
{
bool closed = false;
switch (eventArgs.LastOperation)
{
case SocketAsyncOperation.SendTo:
AsyncWriteToken asyncSendToToken = (AsyncWriteToken)eventArgs.UserToken;
if (asyncSendToToken.CancellationToken.IsCancellationRequested)
{
asyncSendToToken.CompletionSource.SetCanceled();
}
else
{
if (eventArgs.SocketError != SocketError.Success)
{
asyncSendToToken.CompletionSource.SetException(
new SocketException((int)eventArgs.SocketError));
}
else
{
asyncSendToToken.CompletionSource.SetResult(eventArgs.BytesTransferred);
}
}
sendToBufferPool.Return(asyncSendToToken.RentedBuffer, true);
break;
case SocketAsyncOperation.ReceiveFrom:
AsyncReadToken asyncReceiveFromToken = (AsyncReadToken)eventArgs.UserToken;
if (asyncReceiveFromToken.CancellationToken.IsCancellationRequested)
{
asyncReceiveFromToken.CompletionSource.SetCanceled();
}
else
{
if (eventArgs.SocketError != SocketError.Success)
{
asyncReceiveFromToken.CompletionSource.SetException(
new SocketException((int)eventArgs.SocketError));
}
else
{
eventArgs.MemoryBuffer.CopyTo(asyncReceiveFromToken.UserBuffer);
ReceiveResult result = new ReceiveResult(eventArgs, asyncReceiveFromToken.UserBuffer,
eventArgs.BytesTransferred, eventArgs.RemoteEndPoint);
asyncReceiveFromToken.CompletionSource.SetResult(result);
}
}
receiveFromBufferPool.Return(asyncReceiveFromToken.RentedBuffer, true);
break;
case SocketAsyncOperation.Disconnect:
closed = true;
break;
case SocketAsyncOperation.Accept:
AsyncAcceptToken asyncAcceptToken = (AsyncAcceptToken)eventArgs.UserToken;
if (asyncAcceptToken.CancellationToken.IsCancellationRequested)
{
asyncAcceptToken.CompletionSource.SetCanceled();
}
else
{
if (eventArgs.SocketError != SocketError.Success)
{
asyncAcceptToken.CompletionSource.SetException(
new SocketException((int)eventArgs.SocketError));
}
else
{
asyncAcceptToken.CompletionSource.SetResult(eventArgs);
}
}
connectAsyncArgsPool.Return(eventArgs);
break;
case SocketAsyncOperation.Connect:
case SocketAsyncOperation.ReceiveMessageFrom:
case SocketAsyncOperation.SendPackets:
case SocketAsyncOperation.None:
throw new NotImplementedException();
default:
throw new ArgumentOutOfRangeException();
}
if (closed)
{
// handle the client closing the connection on tcp servers at some point
}
}
private readonly struct AsyncAcceptToken
{
public readonly CancellationToken CancellationToken;
public readonly TaskCompletionSource<SocketAsyncEventArgs> CompletionSource;
public AsyncAcceptToken(TaskCompletionSource<SocketAsyncEventArgs> tcs, CancellationToken cancellationToken = default)
{
CompletionSource = tcs;
CancellationToken = cancellationToken;
}
}
private readonly struct AsyncReadToken
{
public readonly CancellationToken CancellationToken;
public readonly TaskCompletionSource<ReceiveResult> CompletionSource;
public readonly byte[] RentedBuffer;
public readonly Memory<byte> UserBuffer;
public AsyncReadToken(byte[] rentedBuffer, Memory<byte> userBuffer, TaskCompletionSource<ReceiveResult> tcs,
CancellationToken cancellationToken = default)
{
RentedBuffer = rentedBuffer;
UserBuffer = userBuffer;
CompletionSource = tcs;
CancellationToken = cancellationToken;
}
}
private readonly struct AsyncWriteToken
{
public readonly CancellationToken CancellationToken;
public readonly TaskCompletionSource<int> CompletionSource;
public readonly byte[] RentedBuffer;
public AsyncWriteToken(byte[] rentedBuffer, TaskCompletionSource<int> tcs,
CancellationToken cancellationToken = default)
{
RentedBuffer = rentedBuffer;
CompletionSource = tcs;
CancellationToken = cancellationToken;
}
}
public AsyncSocketOperations(int bufferSize, int maxPooledObjectCount = MaximumPooledObjects, bool preallocateBuffers = false)
{
MaxPooledObjects = maxPooledObjectCount;
BufferSize = bufferSize;
sendBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);
receiveBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);
sendToBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);
receiveFromBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);
connectAsyncArgsPool = new LeakTrackingObjectPool<SocketAsyncEventArgs>(
new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(),
maxPooledObjectCount));
socketAsyncArgsPool = new LeakTrackingObjectPool<SocketAsyncEventArgs>(
new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(),
maxPooledObjectCount));
for (int i = 0; i < MaxPooledObjects; i++)
{
SocketAsyncEventArgs connectArgs = new SocketAsyncEventArgs();
connectArgs.Completed += HandleIOCompleted;
connectAsyncArgsPool.Return(connectArgs);
SocketAsyncEventArgs socketArgs = new SocketAsyncEventArgs();
socketArgs.Completed += HandleIOCompleted;
socketAsyncArgsPool.Return(socketArgs);
}
if (preallocateBuffers)
{
// TODO: Allocate and return array pool buffers
}
}
public int BufferSize { get; }
public int MaxPooledObjects { get; }
public Task<SocketAsyncEventArgs> AcceptAsync(Socket socket, CancellationToken cancellationToken = default)
{
TaskCompletionSource<SocketAsyncEventArgs> tcs = new TaskCompletionSource<SocketAsyncEventArgs>();
SocketAsyncEventArgs args = connectAsyncArgsPool.Get();
args.AcceptSocket = null;
args.UserToken = new AsyncAcceptToken(tcs, cancellationToken);
if (socket.AcceptAsync(args)) return tcs.Task;
return Task.FromResult(args);
}
public Task<ReceiveResult> ReceiveFromAsync(Socket socket, SocketAsyncEventArgs args, SocketFlags socketFlags,
Memory<byte> outputBuffer, CancellationToken cancellationToken = default)
{
TaskCompletionSource<ReceiveResult> tcs = new TaskCompletionSource<ReceiveResult>();
byte[] rentedReceiveFromBuffer = receiveFromBufferPool.Rent(BufferSize);
Memory<byte> rentedReceiveFromBufferMemory = new Memory<byte>(rentedReceiveFromBuffer);
args.SetBuffer(rentedReceiveFromBufferMemory);
args.SocketFlags = socketFlags;
args.UserToken = new AsyncReadToken(rentedReceiveFromBuffer, outputBuffer, tcs, cancellationToken);
// if the receive operation doesn't complete synchronously, returns the awaitable task
if (socket.ReceiveFromAsync(args)) return tcs.Task;
args.MemoryBuffer.CopyTo(outputBuffer);
ReceiveResult result = new ReceiveResult(args, outputBuffer, args.BytesTransferred, args.RemoteEndPoint);
receiveFromBufferPool.Return(rentedReceiveFromBuffer, true);
return Task.FromResult(result);
}
public Task<int> SendToAsync(Socket socket, SocketAsyncEventArgs args, SocketFlags socketFlags,
Memory<byte> inputBuffer, CancellationToken cancellationToken = default)
{
TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
byte[] rentedSendToBuffer = sendToBufferPool.Rent(BufferSize);
Memory<byte> rentedSendToBufferMemory = new Memory<byte>(rentedSendToBuffer);
inputBuffer.CopyTo(rentedSendToBufferMemory);
args.SetBuffer(rentedSendToBufferMemory);
args.SocketFlags = socketFlags;
args.UserToken = new AsyncWriteToken(rentedSendToBuffer, tcs, cancellationToken);
// if the send operation doesn't complete synchronously, return the awaitable task
if (socket.SendToAsync(args)) return tcs.Task;
int result = args.BytesTransferred;
sendToBufferPool.Return(rentedSendToBuffer, true);
return Task.FromResult(result);
}