Включение асинхронного сокета Parallel и не только Concurrent в очень интенсивном приложении с использованием TPL - PullRequest
8 голосов
/ 29 апреля 2011

Я пишу приложение, которое использует Socket, и оно будет очень интенсивным, тогда мне действительно нужно использовать каждое ядро, которое у нас есть на нашем большом сервере.Я вижу вопрос ( как использовать ThreadPool для параллельного запуска потока сокета? ), здесь в stackoverflow есть только один ответ, который указывает на этот MSDN Sample .

НоЯ думаю, это указывает только на то, как сделать это Параллельным , а не Параллельным , здесь кто-то спрашивает Насколько интенсивно ЦП открывает сокет , и его внешний вид выглядит очень интенсивным,кто-то здесь говорит, что не помогает TPL TaskFactory.FromAsync vs Задачи с методами блокировки , а кто-то учит, как это сделать здесь, с помощью TaskFactory.FromAsync ( Существует ли шаблон для переноса существующих асинхронных методов BeginXXX / EndXXX васинхронные задачи? ).

Как я могу поддерживать параллельные и производительные операции с сокетами, и если они имеют дело с такими проблемами с сокетами, как разъединения, полусоединения сокетов и границы сообщений, это головная боль в обычном асинхронном режиме.Как с этим справиться, если собрать вместе TPL и Task.

Ответы [ 2 ]

3 голосов
/ 17 ноября 2011

см. Что:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace skttool
{
    public class StateObject
    {
        public Socket workSocket = null;
        public const int BufferSize = 1024;
        public byte[] buffer = new byte[BufferSize];
        public int bytesRead = 0;
        public StringBuilder sb = new StringBuilder();
    }

    public class tool
    {
        //-------------------------------------------------
        private ManualResetEvent evtConnectionDone = new ManualResetEvent(false);
        private Socket skttool = null;
        private bool running = false;
        private StateObject state = null;
        //-------------------------------------------------
        toolConfig _cfg;
        public tool(toolConfig cfg)
        {
            _cfg = cfg;
        }
        //-------------------------------------------------
        public void socketListeningSet()
        {
            IPEndPoint localEndPoint;
            Socket skttool;
            byte[] bytes = new Byte[1024];
            localEndPoint = new IPEndPoint(IPAddress.Any, _cfg.addressPort);
            skttool = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            skttool.Bind(localEndPoint);
            skttool.Listen(_cfg.maxQtdSockets);
        }
        //-------------------------------------------------
        public void start()
        {
            running = true;
            Task T1 = Task.Factory.StartNew(socketListeningSet);
            T1.ContinueWith(prev =>
            {
                while (running)
                {
                    evtConnectionDone.Reset();
                    Task<Socket> accepetChunk = Task<Socket>.Factory.FromAsync(
                                                                       skttool.BeginAccept,
                                                                       skttool.EndAccept,
                                                                       accept,
                                                                       skttool,
                                                                       TaskCreationOptions.AttachedToParent);
                    accepetChunk.ContinueWith(accept, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
                    evtConnectionDone.WaitOne();
                }
            });
        }
        //-------------------------------------------------
        void accept(Task<Socket> accepetChunk)
        {
            state = new StateObject();
            evtConnectionDone.Set();
            state.workSocket = accepetChunk.Result;
            Task<int> readChunk = Task<int>.Factory.FromAsync(
                                                       state.workSocket.BeginReceive,
                                                       state.workSocket.EndReceive,
                                                       state.buffer,
                                                       state.bytesRead,
                                                       state.buffer.Length - state.bytesRead,
                                                       null,
                                                       TaskCreationOptions.AttachedToParent);
            readChunk.ContinueWith(read, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
        }
        //-------------------------------------------------
        void read(Task<int> readChunk)
        {
            state.bytesRead += readChunk.Result;
            if (readChunk.Result > 0 && state.bytesRead < state.buffer.Length)
            {
                read();
                return;
            }
            _data = doTask(_data);
            Task<int> sendChunk = Task<int>.Factory.FromAsync(
                                                       state.workSocket.BeginSend,
                                                       state.workSocket.EndSend,
                                                       state.buffer,
                                                       state.bytesRead,
                                                       state.buffer.Length - state.bytesRead,
                                                       null,
                                                       TaskCreationOptions.AttachedToParent);
            sendChunk.ContinueWith(send, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
        }
        //-------------------------------------------------
        void send(Task<int> readChunk)
        {
            state.workSocket.Shutdown(SocketShutdown.Both);
            state.workSocket.Close();
        }
        //-------------------------------------------------
        byte[] doTask(byte[] data)
        {
            return Array.Reverse(data);
        }
        //-------------------------------------------------
    }
}
1 голос
/ 29 апреля 2011

См. Эту ссылку о TPL и традиционном .NET асинхронном программировании , он не отвечает, но, возможно, может помочь вам.Имеется информация о Модель асинхронного программирования (APM) и Асинхронный шаблон на основе событий (EAP)

...