Делайте правильные и быстрые TCP-сети в c # - PullRequest
2 голосов
/ 10 ноября 2011

Я пытаюсь внедрить Решение, в котором по графику обслуживания работают некоторые работники.Само планирование должно происходить через пользовательский протокол на основе tcp, потому что Workers могут работать либо на том же, либо на другом компьютере.

После некоторых исследований я обнаружил эту запись.Прочитав его, я обнаружил, что существует как минимум 3 различных возможных решения, каждое из которых имеет свои преимущества и недостатки.

Я решил воспользоваться решением Begin-End и написал небольшую тестовую программу, чтобы поиграть в нее.с этим.Мой клиент - простая программа, которая отправляет некоторые данные на сервер и затем завершает работу.Это код клиента:

class Client
{
    static void Main(string[] args)
    {
        var ep = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 12345);
        var s = new Socket(ep.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
        s.Connect(ep);
        Console.WriteLine("client Startet, socket connected");
        s.Send(Encoding.ASCII.GetBytes("1234"));
        s.Send(Encoding.ASCII.GetBytes("ABCDEFGH"));
        s.Send(Encoding.ASCII.GetBytes("A1B2C3D4E5F6"));
        Console.ReadKey();
        s.Close();
    }
}

Мой сервер почти такой же простой, как показано в следующем примере:

class Program
{
    static void Main(string[] args)
    {
        var server = new BeginEndTcpServer(8, 1, new IPEndPoint(IPAddress.Parse("127.0.0.1"), 12345));
        // var server = new ThreadedTcpServer(8, new IPEndPoint(IPAddress.Parse("127.0.0.1"), 12345));
        //server.ClientConnected += new EventHandler<ClientConnectedEventArgs>(server_ClientConnected);
        server.DataReceived += new EventHandler<DataReceivedEventArgs>(server_DataReceived);
        server.Start();
        Console.WriteLine("Server Started");
        Console.ReadKey();
    }

    static void server_DataReceived(object sender, DataReceivedEventArgs e)
    {
        Console.WriteLine("Receveived Data: " + Encoding.ASCII.GetString(e.Data));
    }
}


using System;
using System.Collections.Generic;
using System.Net; 
using System.Net.Sockets;

namespace TcpServerTest
{
public sealed class BeginEndTcpServer
{
    private class Connection
    {
        public Guid id;
        public byte[] buffer;
        public Socket socket;
    }

    private readonly Dictionary<Guid, Connection> _sockets;
    private Socket _serverSocket;
    private readonly int _bufferSize;
    private readonly int _backlog;
    private readonly IPEndPoint serverEndPoint;

    public BeginEndTcpServer(int bufferSize, int backlog, IPEndPoint endpoint)
    {
        _sockets = new Dictionary<Guid, Connection>();
        serverEndPoint = endpoint;
        _bufferSize = bufferSize;
        _backlog = backlog;
    }

    public bool Start()
    {
        //System.Net.IPHostEntry localhost = System.Net.Dns.GetHostEntry(System.Net.Dns.GetHostName());
        try
        {
            _serverSocket = new Socket(serverEndPoint.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
        }
        catch (System.Net.Sockets.SocketException e)
        {
            throw new ApplicationException("Could not create socket, check to make sure not duplicating port", e);
        }
        try
        {
            _serverSocket.Bind(serverEndPoint);
            _serverSocket.Listen(_backlog);
        }
        catch (Exception e)
        {
            throw new ApplicationException("Error occured while binding socket, check inner exception", e);
        }
        try
        {
            //warning, only call this once, this is a bug in .net 2.0 that breaks if 
            // you're running multiple asynch accepts, this bug may be fixed, but
            // it was a major pain in the ass previously, so make sure there is only one
            //BeginAccept running
            _serverSocket.BeginAccept(new AsyncCallback(AcceptCallback), _serverSocket);
        }
        catch (Exception e)
        {
            throw new ApplicationException("Error occured starting listeners, check inner exception", e);
        }
        return true;
    }

    public void Stop()
    {
        _serverSocket.Close();
        lock (_sockets)
            foreach (var s in _sockets)
                s.Value.socket.Close();
    }

    private void AcceptCallback(IAsyncResult result)
    {
        Connection conn = new Connection();
        try
        {
            //Finish accepting the connection
            System.Net.Sockets.Socket s = (System.Net.Sockets.Socket)result.AsyncState;
            conn = new Connection();
            conn.id = Guid.NewGuid();
            conn.socket = s.EndAccept(result);
            conn.buffer = new byte[_bufferSize];
            lock (_sockets)
                _sockets.Add(conn.id, conn);
            OnClientConnected(conn.id);
            //Queue recieving of data from the connection
            conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
            //Queue the accept of the next incomming connection
            _serverSocket.BeginAccept(new AsyncCallback(AcceptCallback), _serverSocket);
        }
        catch (SocketException)
        {
            if (conn.socket != null)
            {
                conn.socket.Close();
                lock (_sockets)
                    _sockets.Remove(conn.id);
            }
            //Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
            _serverSocket.BeginAccept(new AsyncCallback(AcceptCallback), _serverSocket);
        }
        catch (Exception)
        {
            if (conn.socket != null)
            {
                conn.socket.Close();
                lock (_sockets)
                    _sockets.Remove(conn.id);
            }
            //Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
            _serverSocket.BeginAccept(new AsyncCallback(AcceptCallback), _serverSocket);
        }
    }

    private void ReceiveCallback(IAsyncResult result)
    {
        //get our connection from the callback
        Connection conn = (Connection)result.AsyncState;
        //catch any errors, we'd better not have any
        try
        {
            //Grab our buffer and count the number of bytes receives
            int bytesRead = conn.socket.EndReceive(result);
            //make sure we've read something, if we haven't it supposadly means that the client disconnected
            if (bytesRead > 0)
            {
                //put whatever you want to do when you receive data here
                conn.socket.Receive(conn.buffer);
                OnDataReceived(conn.id, (byte[])conn.buffer.Clone());
                //Queue the next receive
                conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
            }
            else
            {
                //Callback run but no data, close the connection
                //supposadly means a disconnect
                //and we still have to close the socket, even though we throw the event later
                conn.socket.Close();
                lock (_sockets)
                    _sockets.Remove(conn.id);
            }
        }
        catch (SocketException)
        {
            //Something went terribly wrong
            //which shouldn't have happened
            if (conn.socket != null)
            {
                conn.socket.Close();
                lock (_sockets)
                    _sockets.Remove(conn.id);
            }
        }
    }

    public bool Send(byte[] message, Guid connectionId)
    {
        Connection conn = null;
        lock (_sockets)
            if (_sockets.ContainsKey(connectionId))
                conn = _sockets[connectionId];
        if (conn != null && conn.socket.Connected)
        {
            lock (conn.socket)
            {
                //we use a blocking mode send, no async on the outgoing
                //since this is primarily a multithreaded application, shouldn't cause problems to send in blocking mode
                conn.socket.Send(message, message.Length, SocketFlags.None);
            }
        }
        else
            return false;
        return true;
    }

    public event EventHandler<ClientConnectedEventArgs> ClientConnected;
    private void OnClientConnected(Guid id)
    {
        if (ClientConnected != null)
            ClientConnected(this, new ClientConnectedEventArgs(id));
    }

    public event EventHandler<DataReceivedEventArgs> DataReceived;
    private void OnDataReceived(Guid id, byte[] data)
    {
        if (DataReceived != null)
            DataReceived(this, new DataReceivedEventArgs(id, data));
    }

    public event EventHandler<ConnectionErrorEventArgs> ConnectionError;

}

public class ClientConnectedEventArgs : EventArgs
{
    private readonly Guid _ConnectionId;
    public Guid ConnectionId { get { return _ConnectionId; } }

    public ClientConnectedEventArgs(Guid id)
    {
        _ConnectionId = id;
    }
}

public class DataReceivedEventArgs : EventArgs
{
    private readonly Guid _ConnectionId;
    public Guid ConnectionId { get { return _ConnectionId; } }

    private readonly byte[] _Data;
    public byte[] Data { get { return _Data; } }

    public DataReceivedEventArgs(Guid id, byte[] data)
    {
        _ConnectionId = id;
        _Data = data;
    }
}

public class ConnectionErrorEventArgs : EventArgs
{
    private readonly Guid _ConnectionId;
    public Guid ConnectionId { get { return _ConnectionId; } }

    private readonly Exception _Error;
    public Exception Error { get { return _Error; } }

    public ConnectionErrorEventArgs(Guid id, Exception ex)
    {
        _ConnectionId = id;
        _Error = ex;
    }
}

}

Моя проблема: Серверполучает только часть данных (в примере он получает только 'EFGHA1B2').Также, если я отправляю только 4 байта данных, сервер не получит их, пока соединение не будет закрыто.Что я упускаю или делаю неправильно?

Другое дело, в настоящий момент меня действительно смущают различные возможности, способ, которым я делаю это хорошее решение?Или я должен попробовать что-то еще?

Любая помощь будет принята с благодарностью!

Ответы [ 2 ]

3 голосов
/ 11 ноября 2011

Проблема в том, что сразу после получения первых двух байтов вы перезаписываете их следующим пакетом:

        int bytesRead = conn.socket.EndReceive(result);
        if (bytesRead > 0)
        {
            //** The line below reads the next batch of data
            conn.socket.Receive(conn.buffer);

            OnDataReceived(conn.id, (byte[])conn.buffer.Clone());

EndReceive поместит полученные данные в ваш буфер, поэтому нет необходимости вызывать Receive впоследствии. Причина, по которой вы получаете только средние 8 байтов, заключается в том, что после получения последней партии кодовые блоки в Receive ожидают больше данных. Когда клиент закрывает соединение, Receive возвращает 0, не изменяет буфер, и ваш обратный вызов вызывается с любым полученным EndReceive.

3 голосов
/ 11 ноября 2011

Я подозреваю, что сокет вызывает ваш метод обратного вызова только тогда, когда количество полученных байтов равно длине, указанной вами при вызове BeginReceive, или сокет закрыт.

Я думаю, что вопрос, который вам нужно задать себе, - это какова природа данных, которые вы будете отправлять через это соединение. Действительно ли данные представляют собой непрерывный поток байтов, например содержимое файла? Если это так, то, возможно, вы разумно ожидаете, что клиент отправит все байты, а затем немедленно закроет соединение. Как вы сказали, если клиент закрывает соединение, то для остальных байтов будет вызван ваш метод обратного вызова.

Если соединение останется открытым и будет использоваться для отправки отдельных сообщений, то вам может потребоваться разработать простой протокол для этой связи. Будет ли клиент отправлять несколько произвольно длинных строк, и нужно ли серверу обрабатывать каждую строку отдельно? Если это так, то одним из подходов может быть добавление к каждой строке целочисленного значения (скажем, 4 байта), которое указывает длину строки, которую вы будете отправлять. Затем ваш сервер может сначала вызвать BeginReceive, указав длину 4 байта, использовать эти 4 байта, чтобы определить длину входящей строки, а затем вызвать Receive, указав длину входящей строки. Если длина входящей строки превышает размер вашего буфера, вам нужно будет обработать и этот случай.

  1. Call BeginReceive для ожидания получения 4 байтов.
  2. Рассчитать длину входящей строки из этих 4 байтов.
  3. Call Receive для ожидания приема байтов для входящей строки.
  4. Повторите шаги 1-3 для следующей строки.

Я создал небольшой пример серверного и клиентского приложения, которое реализует этот подход. У меня буфер жестко закодирован как 2048 байт, но он все равно работает, если вы укажете буфер размером всего 4 байта.

В моем подходе, если входящие данные превышают размер моего существующего буфера, я создаю отдельный байтовый массив для хранения этих входящих данных. Возможно, это не лучший способ справиться с этой ситуацией, но я думаю, как вы справитесь с этой ситуацией, зависит от того, что вы на самом деле делаете с данными.

Сервер

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;

namespace SocketServer
{
    class Connection
    {
        public Socket Socket { get; private set; }
        public byte[] Buffer { get; private set; }
        public Encoding Encoding { get; private set; }

        public Connection(Socket socket)
        {
            this.Socket = socket;
            this.Buffer = new byte[2048];
            this.Encoding = Encoding.UTF8;
        }

        public void WaitForNextString(AsyncCallback callback)
        {
            this.Socket.BeginReceive(this.Buffer, 0, 4, SocketFlags.None, callback, this);
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            Connection connection;
            using (Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
            {
                listener.Bind(new IPEndPoint(IPAddress.Loopback, 6767));
                listener.Listen(1);
                Console.WriteLine("Listening for a connection.  Press any key to end the session.");
                connection = new Connection(listener.Accept());
                Console.WriteLine("Connection established.");
            }

            connection.WaitForNextString(ReceivedString);

            Console.ReadKey();
        }

        static void ReceivedString(IAsyncResult asyncResult)
        {
            Connection connection = (Connection)asyncResult.AsyncState;

            int bytesReceived = connection.Socket.EndReceive(asyncResult);

            if (bytesReceived > 0)
            {
                int length = BitConverter.ToInt32(connection.Buffer, 0);

                byte[] buffer;
                if (length > connection.Buffer.Length)
                    buffer = new byte[length];
                else
                    buffer = connection.Buffer;

                int index = 0;
                int remainingLength = length;
                do
                {
                    bytesReceived = connection.Socket.Receive(buffer, index, remainingLength, SocketFlags.None);
                    index += bytesReceived;
                    remainingLength -= bytesReceived;
                }
                while (bytesReceived > 0 && remainingLength > 0);

                if (remainingLength > 0)
                {
                    Console.WriteLine("Connection was closed before entire string could be received");
                }
                else
                {
                    Console.WriteLine(connection.Encoding.GetString(buffer, 0, length));
                }

                connection.WaitForNextString(ReceivedString);
            }
        }
    }
}

Клиент

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;

namespace SocketClient
{
    class Program
    {
        static void Main(string[] args)
        {
            var encoding = Encoding.UTF8;
            using (var connector = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
            {
                connector.Connect(new IPEndPoint(IPAddress.Loopback, 6767));

                string value;

                value = "1234";
                Console.WriteLine("Press any key to send \"" + value + "\".");
                Console.ReadKey();
                SendString(connector, encoding, value);

                value = "ABCDEFGH";
                Console.WriteLine("Press any key to send \"" + value + "\".");
                Console.ReadKey();
                SendString(connector, encoding, value);

                value = "A1B2C3D4E5F6";
                Console.WriteLine("Press any key to send \"" + value + "\".");
                Console.ReadKey();
                SendString(connector, encoding, value);

                Console.WriteLine("Press any key to exit.");
                Console.ReadKey();

                connector.Close();
            }
        }

        static void SendString(Socket socket, Encoding encoding, string value)
        {
            socket.Send(BitConverter.GetBytes(encoding.GetByteCount(value)));
            socket.Send(encoding.GetBytes(value));
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...