Обеспечение того, чтобы поток, вызывающий Socket.XXXAsync, оставался активным для выполнения запроса ввода-вывода (IO Completion Port, C #) - PullRequest
1 голос
/ 29 марта 2011

Я реализую небольшую библиотеку, чтобы упростить использование System.Net.Sockets.Socket. Он должен обрабатывать произвольное количество прослушивающих и принимающих TCP-сокетов, и важно, чтобы реализация была быстрой ~.

Я использую методы XXXAsync и CLR ThreadPool для обратного вызова делегатов пользователя библиотеки (например, всякий раз, когда сообщение было успешно отправлено или были получены некоторые данные). Предпочтительно, чтобы библиотека сама по себе не запускала никаких потоков.

Пользователь библиотеки может получить доступ к интерфейсу моей оболочки Sockets, чтобы отправить сообщение или начать получать сообщения (среди многих других методов и перегрузок):

public interface IClientSocket {
    // will eventually call Socket.SendAsync
    IMessageHandle SendAsync(byte[] buffer, int offset, int length);

    // will eventually call Socket.RecieveAsync
    void StartReceiveAsync();
}

Методы XXXAsync используют порты завершения ввода-вывода. Таким образом, поток, который вызывает эти методы, должен оставаться в живых, пока операция не завершится, в противном случае операция завершится с SocketError.OperationAborted (я думаю, что это так, или нет?). Наложение такого ограничения на пользователя библиотеки уродливо и подвержено ошибкам.

Какая лучшая альтернатива здесь?

  • Вызов ThreadPool.QueueUserWorkItem с делегатом для вызова методов XXXAsync? Это безопасно? Я где-то читал, что ThreadPool не будет останавливать бездействующие потоки, которые имеют IOCP. Это было бы хорошо, поскольку это решает проблему выше.
    Но это также может быть плохо со многими TCP-соединениями. В таком случае вполне вероятно, что каждый поток ThreadPool вызвал один из ожидающих вызовов ReceiveAsync. Поэтому ThreadPool никогда не будет сокращаться, даже если текущая рабочая нагрузка мала, а многие потоки простаивают (и тратят память).

  • Запуск выделенного потока, который всегда активен и вызывает методы XXXAsync. Например, когда пользователь библиотеки хочет отправить данные, он помещает делегата в синхронизированную очередь, поток выводит его и вызывает метод SendAsync.
    Мне не очень нравится это решение, потому что оно тратит впустую поток, а на многоядерном компьютере отправка выполняется только одним потоком.

Более того, оба решения не подходят лучше, потому что они передают задание по вызову асинхронного метода другому потоку. Можно ли этого избежать?

Что ты думаешь? (Спасибо !!)

Thomas

РЕДАКТИРОВАТЬ 1:

Я могу воспроизвести проблему SocketError.OperationAborted с помощью следующей тестовой программы (я думаю, что это правильно).
Скомпилируйте, запустите и telnet на порт 127.0.0.1:10000. Отправьте «t» или «T» и подождите> 3 секунды. При отправке «T» вызов ReceiveAsync выполняется в ThreadPool (работает), при «t» запускается новый поток, который завершается через 3 секунды (происходит сбой).

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Collections.Generic;

namespace Test {
    class Program {
        static List<Socket> _referenceToSockets = new List<Socket>();
        static void Main(string[] args) {
            Thread.CurrentThread.Name = "Main Thread";

            // Create a listening socket on Port 10000.
            Socket ServerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            _referenceToSockets.Add(ServerSocket);
            var endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 10000);
            ServerSocket.Bind(endPoint);
            ServerSocket.Listen(50);

            // Start listening.
            var saeaAccept = new SocketAsyncEventArgs();
            saeaAccept.Completed += OnCompleted;
            ServerSocket.AcceptAsync(saeaAccept);

            Console.WriteLine(String.Format("Listening on {0}.", endPoint));
            Console.ReadLine();
        }

        private static void OnCompleted(object obj, SocketAsyncEventArgs evt) {
            var socket = (Socket)obj;
            Console.WriteLine(String.Format("Async operation completed: {0}; Error: {1}; Callback-Thread: \"{2}\" ({3} threadpool)", evt.LastOperation, evt.SocketError, Thread.CurrentThread.Name, Thread.CurrentThread.IsThreadPoolThread?"is":"no"));
            switch (evt.LastOperation) {
                case SocketAsyncOperation.Accept:
                    // Client connected. Listen for more.
                    Socket clientSocket = evt.AcceptSocket;
                    _referenceToSockets.Add(clientSocket);
                    evt.AcceptSocket = null;
                    socket.AcceptAsync(evt);

                    // Start receiving data.
                    var saeaReceive = new SocketAsyncEventArgs();
                    saeaReceive.Completed += OnCompleted;
                    saeaReceive.SetBuffer(new byte[1024], 0, 1024);
                    clientSocket.ReceiveAsync(saeaReceive);
                    break;
                case SocketAsyncOperation.Disconnect:
                    socket.Close();
                    evt.Dispose();
                    break;
                case SocketAsyncOperation.Receive:
                    if (evt.SocketError != SocketError.Success) {
                        socket.DisconnectAsync(evt);
                        return;
                    }
                    var asText = Encoding.ASCII.GetString(evt.Buffer, evt.Offset, evt.BytesTransferred);
                    Console.WriteLine(String.Format("Received: {0} bytes: \"{1}\"", evt.BytesTransferred, asText));
                    if (evt.BytesTransferred == 0) {
                        socket.Close();
                        evt.Dispose();
                    }
                    if (asText.ToUpper().StartsWith("T")) {
                        Action<object> action = (object o) => {
                            socket.ReceiveAsync(evt);
                            Console.WriteLine(String.Format("Called ReceiveAsync {0}...", o));
                            Thread.Sleep(3000);
                            Console.WriteLine("End of Action...");
                        };
                        if (asText.StartsWith("T")) {
                            ThreadPool.QueueUserWorkItem(o=>action(o), "in ThreadPool");
                        } else {
                            new Thread(o=>action(o)).Start("in a new Thread");
                        }
                    } else {
                        socket.ReceiveAsync(evt);
                    }
                    break;
            }
        }
    }
}

РЕДАКТИРОВАТЬ # 3

Вот решение, которое я собираюсь использовать:

Я позволил потоку пользователя библиотеки напрямую вызывать операции XXXAsync (кроме SendAsync). В большинстве случаев вызов будет успешным (потому что вызывающий поток завершается очень редко).
Если операция завершилась неудачно с SocketError.OperationAborted, библиотека просто снова вызывает операцию, используя текущий поток из асинхронного обратного вызова. Этот поток ThreadPool имеет хорошие шансы на успех (будет установлен дополнительный флаг, чтобы использовать этот обходной путь не более одного раза, если причина для SocketError.OperationAborted была вызвана какой-то другой ошибкой). Это должно работать, потому что сам сокет все еще в порядке, просто предыдущая операция завершилась неудачно.

Для SendAsync этот обходной путь не работает, поскольку он может испортить порядок сообщений. В этом случае я буду помещать сообщения в очередь в списке FIFO. Я буду использовать ThreadPool, чтобы вывести их из очереди и отправить через SendAsync.

Ответы [ 2 ]

1 голос
/ 29 марта 2011

Пара красных флагов здесь. Если у вас возникли проблемы с поддержанием потока, то у вас есть поток, который не делает ничего полезного. В этом случае не имеет смысла использовать асинхронные методы. Если скорость - ваша единственная задача, то вам не следует использовать методы Async. Они требуют, чтобы Сокет захватил поток tp, чтобы сделать обратный вызов. Это небольшие накладные расходы, но нет , если обычный поток выполняет блокирующий вызов.

Асинхронные методы следует рассматривать только в том случае, если вам нужно хорошо масштабировать приложение, когда вы обрабатываете много одновременных соединений. Это приводит к очень бурным нагрузкам на процессор, идеально подходит для нитей tp.

0 голосов
/ 29 марта 2011

поток, который вызывает эти методы, должен оставаться в живых до завершения операции "

Я не уверен в достоверности этого утверждения. Когда вы запускаете сетевую операцию, это передаетсяотключение в системе ввода-вывода Windows. Когда происходит сетевое событие (то есть данные принимаются и ваше приложение уведомляется), это может успешно решиться другим потоком (даже если исходный поток давно умер).

Вы уверены, что не отбрасываете ссылку на что-то решающее и не допускаете его сбора мусора?

Если вы находитесь в ситуации, когда потоки могут умереть, то я подозреваю, чтоваш код не готов к масштабированию до мега-быстрого / звездного числа клиентов. Если вы действительно хотите масштабировать, то использование потоков должно быть очень жестким. Стоимость сдачи потока в аренду (где яесли предположить, что другому потребуется вращение, чтобы заменить его), это не то, что вы должны допустить, чтобы это происходило слишком часто. Пулы потоков очень определеннопуть сюда.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...