Я реализую небольшую библиотеку, чтобы упростить использование System.Net.Sockets.Socket
. Он должен обрабатывать произвольное количество прослушивающих и принимающих TCP-сокетов, и важно, чтобы реализация была быстрой ~.
Я использую методы XXXAsync
и CLR ThreadPool
для обратного вызова делегатов пользователя библиотеки (например, всякий раз, когда сообщение было успешно отправлено или были получены некоторые данные). Предпочтительно, чтобы библиотека сама по себе не запускала никаких потоков.
Пользователь библиотеки может получить доступ к интерфейсу моей оболочки Sockets
, чтобы отправить сообщение или начать получать сообщения (среди многих других методов и перегрузок):
public interface IClientSocket {
// will eventually call Socket.SendAsync
IMessageHandle SendAsync(byte[] buffer, int offset, int length);
// will eventually call Socket.RecieveAsync
void StartReceiveAsync();
}
Методы XXXAsync
используют порты завершения ввода-вывода. Таким образом, поток, который вызывает эти методы, должен оставаться в живых, пока операция не завершится, в противном случае операция завершится с SocketError.OperationAborted
(я думаю, что это так, или нет?).
Наложение такого ограничения на пользователя библиотеки уродливо и подвержено ошибкам.
Какая лучшая альтернатива здесь?
Вызов ThreadPool.QueueUserWorkItem
с делегатом для вызова методов XXXAsync
? Это безопасно? Я где-то читал, что ThreadPool не будет останавливать бездействующие потоки, которые имеют IOCP. Это было бы хорошо, поскольку это решает проблему выше.
Но это также может быть плохо со многими TCP-соединениями.
В таком случае вполне вероятно, что каждый поток ThreadPool вызвал один из ожидающих вызовов ReceiveAsync
. Поэтому ThreadPool никогда не будет сокращаться, даже если текущая рабочая нагрузка мала, а многие потоки простаивают (и тратят память).
Запуск выделенного потока, который всегда активен и вызывает методы XXXAsync
. Например, когда пользователь библиотеки хочет отправить данные, он помещает делегата в синхронизированную очередь, поток выводит его и вызывает метод SendAsync
.
Мне не очень нравится это решение, потому что оно тратит впустую поток, а на многоядерном компьютере отправка выполняется только одним потоком.
Более того, оба решения не подходят лучше, потому что они передают задание по вызову асинхронного метода другому потоку. Можно ли этого избежать?
Что ты думаешь? (Спасибо !!)
Thomas
РЕДАКТИРОВАТЬ 1:
Я могу воспроизвести проблему SocketError.OperationAborted
с помощью следующей тестовой программы (я думаю, что это правильно).
Скомпилируйте, запустите и telnet на порт 127.0.0.1:10000. Отправьте «t» или «T» и подождите> 3 секунды. При отправке «T» вызов ReceiveAsync
выполняется в ThreadPool (работает), при «t» запускается новый поток, который завершается через 3 секунды (происходит сбой).
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Collections.Generic;
namespace Test {
class Program {
static List<Socket> _referenceToSockets = new List<Socket>();
static void Main(string[] args) {
Thread.CurrentThread.Name = "Main Thread";
// Create a listening socket on Port 10000.
Socket ServerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_referenceToSockets.Add(ServerSocket);
var endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 10000);
ServerSocket.Bind(endPoint);
ServerSocket.Listen(50);
// Start listening.
var saeaAccept = new SocketAsyncEventArgs();
saeaAccept.Completed += OnCompleted;
ServerSocket.AcceptAsync(saeaAccept);
Console.WriteLine(String.Format("Listening on {0}.", endPoint));
Console.ReadLine();
}
private static void OnCompleted(object obj, SocketAsyncEventArgs evt) {
var socket = (Socket)obj;
Console.WriteLine(String.Format("Async operation completed: {0}; Error: {1}; Callback-Thread: \"{2}\" ({3} threadpool)", evt.LastOperation, evt.SocketError, Thread.CurrentThread.Name, Thread.CurrentThread.IsThreadPoolThread?"is":"no"));
switch (evt.LastOperation) {
case SocketAsyncOperation.Accept:
// Client connected. Listen for more.
Socket clientSocket = evt.AcceptSocket;
_referenceToSockets.Add(clientSocket);
evt.AcceptSocket = null;
socket.AcceptAsync(evt);
// Start receiving data.
var saeaReceive = new SocketAsyncEventArgs();
saeaReceive.Completed += OnCompleted;
saeaReceive.SetBuffer(new byte[1024], 0, 1024);
clientSocket.ReceiveAsync(saeaReceive);
break;
case SocketAsyncOperation.Disconnect:
socket.Close();
evt.Dispose();
break;
case SocketAsyncOperation.Receive:
if (evt.SocketError != SocketError.Success) {
socket.DisconnectAsync(evt);
return;
}
var asText = Encoding.ASCII.GetString(evt.Buffer, evt.Offset, evt.BytesTransferred);
Console.WriteLine(String.Format("Received: {0} bytes: \"{1}\"", evt.BytesTransferred, asText));
if (evt.BytesTransferred == 0) {
socket.Close();
evt.Dispose();
}
if (asText.ToUpper().StartsWith("T")) {
Action<object> action = (object o) => {
socket.ReceiveAsync(evt);
Console.WriteLine(String.Format("Called ReceiveAsync {0}...", o));
Thread.Sleep(3000);
Console.WriteLine("End of Action...");
};
if (asText.StartsWith("T")) {
ThreadPool.QueueUserWorkItem(o=>action(o), "in ThreadPool");
} else {
new Thread(o=>action(o)).Start("in a new Thread");
}
} else {
socket.ReceiveAsync(evt);
}
break;
}
}
}
}
РЕДАКТИРОВАТЬ # 3
Вот решение, которое я собираюсь использовать:
Я позволил потоку пользователя библиотеки напрямую вызывать операции XXXAsync
(кроме SendAsync
). В большинстве случаев вызов будет успешным (потому что вызывающий поток завершается очень редко).
Если операция завершилась неудачно с SocketError.OperationAborted
, библиотека просто снова вызывает операцию, используя текущий поток из асинхронного обратного вызова. Этот поток ThreadPool
имеет хорошие шансы на успех (будет установлен дополнительный флаг, чтобы использовать этот обходной путь не более одного раза, если причина для SocketError.OperationAborted
была вызвана какой-то другой ошибкой).
Это должно работать, потому что сам сокет все еще в порядке, просто предыдущая операция завершилась неудачно.
Для SendAsync
этот обходной путь не работает, поскольку он может испортить порядок сообщений. В этом случае я буду помещать сообщения в очередь в списке FIFO. Я буду использовать ThreadPool
, чтобы вывести их из очереди и отправить через SendAsync
.