С ++ Сокеты и стресс-тестирование - PullRequest
0 голосов
/ 27 июня 2018

Я написал очень простой сокет-сервер на C ++ (MinGW), используя такие общие функции, как

socket( PF_INET, SOCK_STREAM, 0 )...
setsockopt( s, SOL_SOCKET, SO_REUSEADDR, &OptVal, sizeof( OptVal ) )...
bind( s, ( struct sockaddr * ) &ServerAddress, sizeof( ServerAddress ) )...
listen( s, 10 )...

Обработка нескольких клиентских подключений выполняется

select( s, &FileDescriptorClient, NULL, NULL, &tv )...
accept( Server->GetSocketHandle(), (struct sockaddr*) &ClientAddress, &Length )...

Все выглядело очень хорошо и красиво, ... пока я не решил стресс-тест мой сервер.

Моим первым тестом был очень простой клиент, который делал только одно: подключался и отключался бесконечным циклом - как можно быстрее. Хотя этот тест был чрезвычайно прост, он сразу же провалился.

Для меня не было большим сюрпризом, что сервер будет задыхаться от стольких переключающихся соединений, поэтому я добавил Sleep (5) (миллисекунд) в клиент перед каждым подключением и отключением, и все было в порядке. На данный момент.

Мои вопросы:

  • Как правильно обрабатывать эти переподключения?
  • А как правильно провести стресс-тест приложения сервера сокетов?

Сейчас процедура выглядит следующим образом:

  1. клиент: подключается к серверу с помощью connect (...)
  2. сервер: новое соединение распознается с помощью select (...) и accept (...). Каждое новое соединение сохраняется в std :: vector.
  3. клиент: отключается от сервера с помощью closesocket (...) (MinGW ...)
  4. сервер: recv (...) читает 0 байтов, что означает, что клиент отключился от сервера
  5. сервер: выполняет closesocket (...) и удаляет соединение из std :: vector
  6. Перейти к 1

Как уже упоминалось: это будет работать только тогда, когда я задушу клиента с помощью сна. Как только я сокращаю время ожидания, сервер начинает пропускать pt. 4 (разъединяет) и запасает открытые соединения по линии.

Какой смысл я упускаю?

Edit:

  • сервер работает в одном потоке
  • клиентский сокет не блокируется

Редактировать 2:

По запросу: исходный код (сокращен до минимальной версии):

main.cpp сервера:

#include <iostream>
#include "Server.h"
using namespace std;

int main()
{
    try
    {
        ServerSocket srv;

        while ( true )
        {
            srv.Open();
            srv.Run();
        }
    }
    catch( ServerException const &e )
    {
        std::cout << "failed to run the server" << e.what() << std::endl;
    }

    return 0;
}

server.h:

#ifndef SERVER_H
#define SERVER_H

#define SOCKETBUFLEN 10

#include <string>
#include <cstdio>
#include <vector>
#include <memory>
#include <unistd.h>

#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>

#include <sys/types.h>

class Exception : public std::exception
{
    private:
        std::string Message;
    public:
        template< typename ... Arguments >
        Exception( const char * AFormat, Arguments&& ... AArguments )
        {
            char Buffer[ 1024 ];
            sprintf( Buffer, AFormat, std::forward<Arguments>( AArguments ) ... );
            Message = std::string( Buffer );
        };
        const char* what() const noexcept{ return Message.c_str(); }
};

class ServerException : public Exception
{
    public:
        template< typename ... Arguments >
        ServerException( const char * AFormat, Arguments&& ... AArguments ) : Exception( AFormat, std::forward< Arguments >( AArguments ) ... ){}
};

class ClientException : public Exception
{
    public:
        template< typename ... Arguments >
        ClientException( const char * AFormat, Arguments&& ... AArguments ) : Exception( AFormat, std::forward< Arguments >( AArguments ) ... ){}
};

class Client
{
    public:
        Client();
        virtual ~Client();

        virtual bool Authenticate( const std::string & AClientId ) = 0;
        virtual bool Process( const std::string & AMessage ) = 0;
    protected:
        std::vector< std::string > Clients;
    private:
};

class ServerSocket;
class ClientSocket;
class Socket;

class ClientSockets
{
    private:
        ServerSocket *OwnerServer;
        fd_set FileDescriptorClient;
        std::vector< ClientSocket * > Items;
        std::vector< Socket * > SocketsRead;
        void Append( ClientSocket * );
        void OnClientConnect( SOCKET ASocketHandle, struct sockaddr_in AClientAddress );
    public:
        ClientSockets( ServerSocket * );
        int Count( void );
        void Remove( SOCKET ASocketHandle );
        bool HandleConnnections( void );
        void Read( void );
};

class Socket
{
    protected:
        SOCKET SocketHandle;
    public:
        Socket( SOCKET ASocketHandle = 0 ) { SocketHandle = ASocketHandle; };
        SOCKET GetSocketHandle( void ) { return SocketHandle; };
        virtual void Close( void ) = 0;
};

class ServerSocket : public Socket
{
    public:
        ServerSocket( const std::string &AHost = "127.0.0.1", const int &APort = 24442 );
        virtual ~ServerSocket();

        void Open( void );
        void Close( void ) {};
        void Run( void );

        fd_set FileDescriptorServer;
    private:
        std::string Host;
        int Port;
        ClientSockets *clientSockets;
};

class ClientSocket : public Socket
{
    private:
        ClientSockets *FClientSockets;
    public:
        ClientSocket( ClientSockets *, SOCKET ASocketHandle );
        virtual ~ClientSocket() {};

        void Close( void );
};

#endif // SERVER_H

server.cpp:

#include "Server.h"

#include <cstring>
#include <cstdarg>
#include <cstdio>
#include <algorithm>
#include <iostream>
#include <thread>
#include <chrono>

#define LOG_CONNECTION_AMOUNT 100

ServerSocket::ServerSocket( const std::string &AHost, const int &APort ) :
    Host( AHost ),
    Port( APort ),
    clientSockets( new ClientSockets( this ) ) {}

ServerSocket::~ServerSocket()
{
    WSACleanup();
}

void ServerSocket::Open( void )
{
    WSADATA wsa;

    if ( WSAStartup( MAKEWORD( 2, 2 ), &wsa ) != 0 )
    {
        throw ServerException( "WSAStartup failed" );
    }

    if ( ( SocketHandle = socket( PF_INET, SOCK_STREAM, 0 ) ) == INVALID_SOCKET )
    {
        throw ServerException( "%s: unable to get a socket handle. errno: %ld", __PRETTY_FUNCTION__, errno );
    }

    char OptVal = 1;

    if ( setsockopt( SocketHandle, SOL_SOCKET, SO_REUSEADDR, &OptVal, sizeof( OptVal ) ) == -1 )
    {
        throw ServerException( "%s: setsockopt failed", __PRETTY_FUNCTION__ );
    }

    struct sockaddr_in ServerAddress;

    memset( & ServerAddress, 0x0, sizeof( ServerAddress ) );

    ServerAddress.sin_family = AF_INET;
    ServerAddress.sin_addr.s_addr = inet_addr( Host.c_str() );
    ServerAddress.sin_port = htons( Port );

    if ( bind( SocketHandle, ( struct sockaddr * ) &ServerAddress, sizeof( ServerAddress ) ) == -1 )
    {
        throw ServerException( "%s: binding server address failed", __PRETTY_FUNCTION__ );
    }

    if ( listen( SocketHandle, 10 ) == -1 )
    {
        throw ServerException( "%s: listen socket failed", __PRETTY_FUNCTION__ );
    }

    FD_ZERO( &FileDescriptorServer );
    FD_SET( SocketHandle, &FileDescriptorServer );

    std::cout << "socket server fd '" << SocketHandle << "' listening on " << Host << ":" << Port << std::endl;
}

void ServerSocket::Run( void )
{
    std::cout << __PRETTY_FUNCTION__  << std::endl;

    do
    {
        try
        {
            if ( !clientSockets->HandleConnnections() )
            {
                continue;
            }

            clientSockets->Read();
        }
        catch( ServerException const &e )
        {
            int ErrorCode = WSAGetLastError();

            std::cout << "error while server running: " << e.what() << ", code: " << ErrorCode << std::endl;

            if ( ErrorCode == 10038 ) // no socket error
            {
                // continue; // not sure if this is the correct way to do...
            }

            // if ( ErrorCode == 10014 ) {} // bad address error
            // if ( ErrorCode == 10054 ) {} // connection reset by peer

            break;
        }
        catch( ... )
        {
            std::cout << "some exception occurred" << std::endl;
        }
    } while ( true );
}

ClientSockets::ClientSockets( ServerSocket * AOwnerServer )
{
    OwnerServer = AOwnerServer;
}

void ClientSockets::Append( ClientSocket *AClientSocket )
{
    Items.push_back( AClientSocket );
}

int ClientSockets::Count( void )
{
    return Items.size();
}

bool ClientSockets::HandleConnnections( void )
{
    FD_ZERO( & FileDescriptorClient );

    SocketsRead.clear();
    SocketsRead.push_back( OwnerServer );

    for ( auto i : Items )
    {
        SocketsRead.push_back( i );
    }

    for ( auto Socket : SocketsRead )
    {
        if ( FD_ISSET( Socket->GetSocketHandle(), & OwnerServer->FileDescriptorServer ) )
        {
            FD_SET( Socket->GetSocketHandle(), & FileDescriptorClient );
        }
    }

    struct timeval tv;
    tv.tv_sec = 10;
    tv.tv_usec = 3;

    int SocketChangedCount = 0;

    if ( ( SocketChangedCount = select( 0, &FileDescriptorClient, NULL, NULL, &tv ) ) == -1 )
    {
        throw ServerException( "%s: select failed", __PRETTY_FUNCTION__ );
    }

    static int scc = 0;
    if ( scc != SocketChangedCount )
    {
        scc = SocketChangedCount;
        std::cout << "socket changes: " << SocketChangedCount << std::endl;
    }


    if ( FD_ISSET( OwnerServer->GetSocketHandle(), &FileDescriptorClient ) )
    {
        struct sockaddr_in ClientAddress;

        socklen_t Length = sizeof( ClientAddress );

        SOCKET SocketHandle = 0;

        if ( ( SocketHandle = accept( OwnerServer->GetSocketHandle(), (struct sockaddr*) &ClientAddress, &Length ) ) == INVALID_SOCKET )
        {
            throw ServerException( "%s: accept failed", __PRETTY_FUNCTION__ );
        }
        else
        {
            OnClientConnect( SocketHandle, ClientAddress );
        }

        FD_SET( SocketHandle, &OwnerServer->FileDescriptorServer );
    }

    return ( SocketChangedCount > 0 );
}

void ClientSockets::OnClientConnect( SOCKET ASocketHandle, struct sockaddr_in AClientAddress )
{
#if LOG_CONNECTION_AMOUNT > 0
    static int ClientConnectionCounter = 0;

    if ( ClientConnectionCounter++ % LOG_CONNECTION_AMOUNT == 0 )
    {
        std::cout << "connect     fd: [" << ASocketHandle << "], serial: [" << ClientConnectionCounter << "], cnt: [" << Count() << "]" << std::endl;
    }
#endif

    ClientSocket *clientSocket = new ClientSocket( this, ASocketHandle );

    Append( clientSocket );
}

void ClientSockets::Read( void )
{
    for ( auto clientSocket : Items )
    {
        if ( FD_ISSET( clientSocket->GetSocketHandle(), &FileDescriptorClient ) )
        {
            char buf[ SOCKETBUFLEN + 1 ];
            memset( buf, 0x0, sizeof( buf ) );

            int BytesReceived = recv( clientSocket->GetSocketHandle(), buf, SOCKETBUFLEN, 0 );

            if ( BytesReceived > 0 )
            {
                std::string Message = ( std::string ) buf;

                if ( Message.substr( 0, 4 ) == "quit" )
                {
                    clientSocket->Close();
                    break;
                }
                else
                {
                    // ... do fancy stuff here...
                    std::cout << "read from socket: " << Message << std::endl;
                }
            }
            else if ( BytesReceived == 0 )
            {
                clientSocket->Close();
                break;
            }
            else
            {
                throw ServerException( "reading data failed" );
            }
        }
    }
}



void ClientSockets::Remove( SOCKET ASocketHandle )
{
    FD_CLR( ASocketHandle, & FileDescriptorClient );

    std::vector< ClientSocket * >::iterator it =
        Items.erase(
            std::remove_if(
                Items.begin(),
                Items.end(),
                [ ASocketHandle ]( Socket *sock ){ return sock->GetSocketHandle() == ASocketHandle; }
            )
        );

    delete *it;
}


void ClientSocket::Close( void )
{
#if LOG_CONNECTION_AMOUNT > 0
    static int ClientDisconnectionCounter = 0;

    if ( ClientDisconnectionCounter++ % LOG_CONNECTION_AMOUNT == 0 )
    {
        std::cout << "disconnnect fd: [" << SocketHandle << "], serial: [" << ClientDisconnectionCounter << "], cnt: [" << FClientSockets->Count() << "]" << std::endl;
    }
#endif

    if ( closesocket( SocketHandle ) == SOCKET_ERROR )
    {
        throw ClientException( "closing socket failed: %ld", errno );
    }

    FClientSockets->Remove( SocketHandle );
}

ClientSocket::ClientSocket( ClientSockets *AClientSockets, SOCKET ASocketHandle ) :
    Socket( ASocketHandle ),
    FClientSockets( AClientSockets ) {}

Тестирующее клиентское приложение очень примитивно: я использовал Embarcadero C ++ Builder с компонентом TClientSocket, который непрерывно переключает свое состояние соединения (в режиме без блокировки). 3 флажка используются, чтобы указать приложению работать в циклическом режиме (1), переходить в спящий режим перед подключением (2) и переходить в спящий режим перед отключением (3), а 1 TEdit используется для изменения продолжительности ожидания в миллисекундах.

Код файла cpp:

#include <vcl.h>
#pragma hdrstop

#include "UnitFormMain.h"

#pragma package(smart_init)
#pragma resource "*.dfm"

TFormMain *FormMain;
//---------------------------------------------------------------------------
__fastcall TFormMain::TFormMain(TComponent* Owner)
    : TForm(Owner), Trigger( false ), sock( 0 )
{
}
//---------------------------------------------------------------------------
void __fastcall TFormMain::ClientSocket1Connect(TObject *Sender, TCustomWinSocket *Socket)

{
    static int ConnectionCounter = 0;

    if ( ConnectionCounter++ % 100 == 0 )
    {
        Memo1->Lines->Add( System::Sysutils::Format( "connectionCounter '%d'", ARRAYOFCONST(( ConnectionCounter )) ) );
    }

    if ( CheckBoxRunLoopComponent->Checked )
    {
        Trigger = true;
    }
}

//---------------------------------------------------------------------------
void __fastcall TFormMain::ClientSocket1Disconnect(TObject *Sender, TCustomWinSocket *Socket)
{

    if ( CheckBoxRunLoopComponent->Checked )
    {
        Trigger = true;
    }

}
//---------------------------------------------------------------------------
void __fastcall TFormMain::ButtonToggleConnectionComponentClick(TObject *Sender)
{
    ClientSocket1->Active = !ClientSocket1->Active;
}

//---------------------------------------------------------------------------
void __fastcall TFormMain::CheckBoxRunLoopComponentClick(TObject *Sender)
{
    if ( dynamic_cast< TCheckBox * >( Sender )->Checked )
    {

        for (;;)
        {
            Application->ProcessMessages();

            if ( !dynamic_cast< TCheckBox * >( Sender )->Checked )
            {
                break;
            }

            ToggleConnection();
        }
    }
}

void TFormMain::ToggleConnection( void )
{
    if ( Trigger )
    {
        Trigger = false;

        if ( ClientSocket1->Active )
        {
            if ( CheckBoxSleepOnDisconnect )
            {
                Sleep( StrToInt( Edit1->Text ) );
            }

            ClientSocket1->Close();

        }
        else
        {
            if ( CheckBoxSleepOnConnect->Checked )
            {
                Sleep( StrToInt( Edit1->Text ) );
            }

            ClientSocket1->Open();
        }
    }
}

//---------------------------------------------------------------------------

Намного интереснее вывод тестового прогона (сервера):

Каждый сотый тумблер протоколируется. "cnt" - это количество соединений в std :: vector <>.

Тест начался с 5 миллисекундного сна при каждом переключении. Где-то около 1500-го переключения я сократил время ожидания до 1 миллисекунды, что привело к немедленному увеличению активных соединений в списке серверов. Через несколько тысяч соединений select() завершается с 10038.

socket server fd '276' listening on 127.0.0.1:24442
void ServerSocket::Run()
socket changes: 1
connect     fd: [280], serial: [1], cnt: [0]
disconnnect fd: [280], serial: [1], cnt: [1]
connect     fd: [284], serial: [101], cnt: [0]
disconnnect fd: [284], serial: [101], cnt: [1]
connect     fd: [280], serial: [201], cnt: [0]
disconnnect fd: [280], serial: [201], cnt: [1]
connect     fd: [288], serial: [301], cnt: [0]
disconnnect fd: [288], serial: [301], cnt: [1]
connect     fd: [292], serial: [401], cnt: [0]
disconnnect fd: [292], serial: [401], cnt: [1]
connect     fd: [296], serial: [501], cnt: [0]
disconnnect fd: [296], serial: [501], cnt: [1]
connect     fd: [308], serial: [601], cnt: [0]
disconnnect fd: [308], serial: [601], cnt: [1]
connect     fd: [296], serial: [701], cnt: [0]
disconnnect fd: [296], serial: [701], cnt: [1]
connect     fd: [296], serial: [801], cnt: [0]
disconnnect fd: [296], serial: [801], cnt: [1]
connect     fd: [288], serial: [901], cnt: [0]
disconnnect fd: [288], serial: [901], cnt: [1]
connect     fd: [300], serial: [1001], cnt: [0]
disconnnect fd: [300], serial: [1001], cnt: [1]
connect     fd: [312], serial: [1101], cnt: [0]
disconnnect fd: [312], serial: [1101], cnt: [1]
connect     fd: [300], serial: [1201], cnt: [0]
disconnnect fd: [300], serial: [1201], cnt: [1]
connect     fd: [300], serial: [1301], cnt: [0]
disconnnect fd: [300], serial: [1301], cnt: [1]
connect     fd: [280], serial: [1401], cnt: [0]
disconnnect fd: [280], serial: [1401], cnt: [1]
connect     fd: [280], serial: [1501], cnt: [0]
disconnnect fd: [280], serial: [1501], cnt: [1]
connect     fd: [316], serial: [1601], cnt: [0]
socket changes: 2
disconnnect fd: [316], serial: [1601], cnt: [2]
socket changes: 1
socket changes: 2
socket changes: 1
socket changes: 2
socket changes: 1
connect     fd: [312], serial: [1701], cnt: [3]
socket changes: 2
socket changes: 1
disconnnect fd: [312], serial: [1701], cnt: [4]
connect     fd: [324], serial: [1801], cnt: [3]
socket changes: 2
socket changes: 1
disconnnect fd: [312], serial: [1801], cnt: [4]
connect     fd: [324], serial: [1901], cnt: [3]
socket changes: 2
socket changes: 1
disconnnect fd: [324], serial: [1901], cnt: [4]
connect     fd: [320], serial: [2001], cnt: [3]
socket changes: 2
socket changes: 1
disconnnect fd: [320], serial: [2001], cnt: [4]
socket changes: 2
socket changes: 1
connect     fd: [336], serial: [2101], cnt: [3]
disconnnect fd: [336], serial: [2101], cnt: [4]
connect     fd: [344], serial: [2201], cnt: [3]
disconnnect fd: [344], serial: [2201], cnt: [4]
connect     fd: [344], serial: [2301], cnt: [3]
socket changes: 2
socket changes: 1
disconnnect fd: [336], serial: [2301], cnt: [4]
socket changes: 2
socket changes: 1
connect     fd: [344], serial: [2401], cnt: [3]
socket changes: 2
socket changes: 1
disconnnect fd: [336], serial: [2401], cnt: [4]
connect     fd: [344], serial: [2501], cnt: [3]
socket changes: 2
socket changes: 1
disconnnect fd: [344], serial: [2501], cnt: [4]
connect     fd: [344], serial: [2601], cnt: [3]
socket changes: 2
socket changes: 1
disconnnect fd: [344], serial: [2601], cnt: [4]
connect     fd: [332], serial: [2701], cnt: [3]
socket changes: 2
socket changes: 1
disconnnect fd: [332], serial: [2701], cnt: [4]
connect     fd: [344], serial: [2801], cnt: [3]
socket changes: 2
socket changes: 1
disconnnect fd: [332], serial: [2801], cnt: [4]
socket changes: 2
socket changes: 1
connect     fd: [328], serial: [2901], cnt: [3]
socket changes: 2
socket changes: 1
disconnnect fd: [324], serial: [2901], cnt: [4]
socket changes: 2
socket changes: 1
connect     fd: [328], serial: [3001], cnt: [3]
socket changes: 2
socket changes: 1
disconnnect fd: [324], serial: [3001], cnt: [4]
socket changes: 2
socket changes: 1
connect     fd: [328], serial: [3101], cnt: [3]
socket changes: 2
socket changes: 1
disconnnect fd: [324], serial: [3101], cnt: [4]
connect     fd: [324], serial: [3201], cnt: [3]
disconnnect fd: [324], serial: [3201], cnt: [4]
connect     fd: [324], serial: [3301], cnt: [3]
disconnnect fd: [324], serial: [3301], cnt: [4]
socket changes: 2
socket changes: 1
error while server running: bool ClientSockets::HandleConnnections(): select failed, code: 10038
socket server fd '324' listening on 127.0.0.1:24442
void ServerSocket::Run()
socket changes: 0

В этом протоколе я также вижу, что select() возвращает значение> 1 («Socket changes: n»), что означает, что более 1 сокета изменили свой статус. Я думаю, что это может быть указатель на правильное решение, но сейчас я не знаю, как с этим справиться.

Редактировать 3: Блокировка / отсутствие блокировки: в CPPBuilder вы можете использовать эти компоненты времени разработки, где вы можете установить свойства во время разработки:

TClientSocktet property editor

Ответы [ 2 ]

0 голосов
/ 28 июня 2018

У моего кода была пара очень серьезных проблем, и теперь я думаю, что я их исправил. Может быть.

На сервере нужно было сделать четыре основных изменения.

Изменение # 1: вызов accept () только один раз, когда select () возвращает 2 неправильно.

Я уже думал, что мой оригинальный код неверен, когда я не предпринимаю никаких действий, когда select () возвращает значение больше 1. Обычно это (почти) никогда не происходит, поэтому у меня есть эта ошибка во всех моих приложениях сокетов прошедшее десятилетие безрезультатно Поэтому я изменил код:

int SocketChangedCount = 0;

if ( ( SocketChangedCount = select( 0, &FileDescriptorClient, NULL, NULL, &tv ) ) == -1 )
{
    throw ServerException( "%s: select failed", __PRETTY_FUNCTION__ );
}

if ( FD_ISSET( OwnerServer->GetSocketHandle(), &FileDescriptorClient ) )
{
    struct sockaddr_in ClientAddress;

    socklen_t Length = sizeof( ClientAddress );

    SOCKET SocketHandle = 0;

    // change N° 1:
    // the return value of select is the number of new connections.
    // therefore we need to call accept for EACH new connection.
    // this issue did never light up, because the connections have
    // to be established almost simultaneously to trigger this issue.

    for ( int i = 0; i < SocketChangedCount; ++i )
    {
        if ( ( SocketHandle = accept( OwnerServer->GetSocketHandle(), (struct sockaddr*) &ClientAddress, &Length ) ) == INVALID_SOCKET )
        {
            throw ServerException( "%s: accept failed", __PRETTY_FUNCTION__ );
        }
        else
        {
            OnClientConnect( SocketHandle, ClientAddress );
        }

        FD_SET( SocketHandle, &OwnerServer->FileDescriptorServer );
    }
}

Изменение № 2: Хотя это и не было основной проблемой, очевидно, что размер невыполненного задания был небольшим:

// change N° 2:
// a backlog parameter of "10" is not enough. SOMAXCONN should be the preferable value
// if ( listen( SocketHandle, 10 ) == -1 )
if ( listen( SocketHandle, SOMAXCONN ) == -1 )
{
    throw ServerException( "%s: listen socket failed", __PRETTY_FUNCTION__ );
}

Изменение # 3: я изменил std :: vector в std :: unordered_map:

    // change N° 3:
    std::unordered_map< SOCKET, ClientSocket * > Items;
    // std::vector< Socket * > SocketsRead;

Случайно это также исправило ошибку, которую я даже раньше не замечал: удаление вышедшего клиента было просто неправильным:

void ClientSockets::Remove( SOCKET ASocketHandle )
{
    FD_CLR( ASocketHandle, & FileDescriptorClient );

/*
    // whatever this code did: it was wrong
    std::vector< ClientSocket * >::iterator it =
        Items.erase(
            std::remove_if(
                Items.begin(),
                Items.end(),
                [ ASocketHandle ]( Socket *sock ){ return sock->GetSocketHandle() == ASocketHandle; }
            )
        );

    delete *it;
*/

    std::unordered_map< SOCKET, ClientSocket * >::iterator it = Items.find( ASocketHandle );

    if ( it != Items.end() )
    {
        Items.erase( it );
    }
}

Изменение № 4: Последнее, но не менее важное: расстались слишком рано

Всякий раз, когда клиент отключался, я останавливал цикл с перерывом. Глупый я.

void ClientSockets::Read( void )
{
    for ( auto clientSocket : Items )
    {
        if ( FD_ISSET( clientSocket.first, &FileDescriptorClient ) )
        {
            char buf[ SOCKETBUFLEN + 1 ];
            memset( buf, 0x0, sizeof( buf ) );

            int BytesReceived = recv( clientSocket.first, buf, SOCKETBUFLEN, 0 );

            if ( BytesReceived > 0 )
            {
                std::string Message = ( std::string ) buf;

                if ( Message.substr( 0, 4 ) == "quit" )
                {
                    clientSocket.second->Close();
                    continue;
                    // break;
                }
                else
                {
                    // ... do fancy stuff here...
                    std::cout << "read from socket: " << Message << std::endl;
                }
            }
            else if ( BytesReceived == 0 )
            {
                clientSocket.second->Close();
                // change N° 4:
                // breaking the loop is a really bad idea
                continue;
                // break;
            }
            else
            {
                throw ServerException( "reading data failed" );
            }
        }
    }
}

Спасибо вам, ребята, я наконец-то смог запустить приложение для стресс-теста и обнаружил пару проблем, которых никогда бы не нашел.

Если кому-то еще нравится делать что-то вроде этого: Вот фиксированный код:

Заголовок:

#ifndef SERVER_H
#define SERVER_H

#define SOCKETBUFLEN 10

#include <string>
#include <cstdio>
#include <unordered_map>
#include <memory>
#include <unistd.h>

#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>

#include <sys/types.h>

class Exception : public std::exception
{
    private:
        std::string Message;
    public:
        template< typename ... Arguments >
        Exception( const char * AFormat, Arguments&& ... AArguments )
        {
            char Buffer[ 1024 ];
            sprintf( Buffer, AFormat, std::forward<Arguments>( AArguments ) ... );
            Message = std::string( Buffer );
        };
        const char* what() const noexcept{ return Message.c_str(); }
};

class ServerException : public Exception
{
    public:
        template< typename ... Arguments >
        ServerException( const char * AFormat, Arguments&& ... AArguments ) : Exception( AFormat, std::forward< Arguments >( AArguments ) ... ){}
};

class ClientException : public Exception
{
    public:
        template< typename ... Arguments >
        ClientException( const char * AFormat, Arguments&& ... AArguments ) : Exception( AFormat, std::forward< Arguments >( AArguments ) ... ){}
};

class Client
{
    public:
        Client();
        virtual ~Client();

        virtual bool Authenticate( const std::string & AClientId ) = 0;
        virtual bool Process( const std::string & AMessage ) = 0;
    private:
};

class ServerSocket;
class ClientSocket;
class Socket;

class ClientSockets
{
    private:
        ServerSocket *OwnerServer;
        fd_set FileDescriptorClient;
        // change N° 3:
        std::unordered_map< SOCKET, ClientSocket * > Items;
        // std::vector< Socket * > SocketsRead;
        void Append( ClientSocket * );
        void OnClientConnect( SOCKET ASocketHandle, struct sockaddr_in AClientAddress );
    public:
        ClientSockets( ServerSocket * );
        int Count( void );
        void Remove( SOCKET ASocketHandle );
        bool HandleConnnections( void );
        void Read( void );
};

class Socket
{
    protected:
        SOCKET SocketHandle;
    public:
        Socket( SOCKET ASocketHandle = 0 ) { SocketHandle = ASocketHandle; };
        SOCKET GetSocketHandle( void ) { return SocketHandle; };
        virtual void Close( void ) = 0;
};

class ServerSocket : public Socket
{
    public:
        ServerSocket( const std::string &AHost = "127.0.0.1", const int &APort = 24442 );
        virtual ~ServerSocket();

        void Open( void );
        void Close( void ) {};
        void Run( void );

        fd_set FileDescriptorServer;
    private:
        std::string Host;
        int Port;
        ClientSockets *clientSockets;
};

class ClientSocket : public Socket
{
    private:
        ClientSockets *FClientSockets;
    public:
        ClientSocket( ClientSockets *, SOCKET ASocketHandle );
        virtual ~ClientSocket() {};

        void Close( void );
};

#endif // SERVER_H

Источник:

#include "Server.h"

#include <cstring>
#include <cstdarg>
#include <cstdio>
#include <algorithm>
#include <iostream>
#include <thread>
#include <chrono>

#define LOG_CONNECTION_AMOUNT 100

ServerSocket::ServerSocket( const std::string &AHost, const int &APort ) :
    Host( AHost ),
    Port( APort ),
    clientSockets( new ClientSockets( this ) ) {}

ServerSocket::~ServerSocket()
{
    WSACleanup();
}

void ServerSocket::Open( void )
{
    WSADATA wsa;

    if ( WSAStartup( MAKEWORD( 2, 2 ), &wsa ) != 0 )
    {
        throw ServerException( "WSAStartup failed" );
    }

    if ( ( SocketHandle = socket( PF_INET, SOCK_STREAM, 0 ) ) == INVALID_SOCKET )
    {
        throw ServerException( "%s: unable to get a socket handle. errno: %ld", __PRETTY_FUNCTION__, errno );
    }

    char OptVal = 1;

    if ( setsockopt( SocketHandle, SOL_SOCKET, SO_REUSEADDR, &OptVal, sizeof( OptVal ) ) == -1 )
    {
        throw ServerException( "%s: setsockopt failed", __PRETTY_FUNCTION__ );
    }

    struct sockaddr_in ServerAddress;

    memset( & ServerAddress, 0x0, sizeof( ServerAddress ) );

    ServerAddress.sin_family = AF_INET;
    ServerAddress.sin_addr.s_addr = inet_addr( Host.c_str() );
    ServerAddress.sin_port = htons( Port );

    if ( bind( SocketHandle, ( struct sockaddr * ) &ServerAddress, sizeof( ServerAddress ) ) == -1 )
    {
        throw ServerException( "%s: binding server address failed", __PRETTY_FUNCTION__ );
    }

    // change N° 2:
    // a backlog parameter of "10" is not enough. SOMAXCONN should be the preferable value
    // if ( listen( SocketHandle, 10 ) == -1 )
    if ( listen( SocketHandle, SOMAXCONN ) == -1 )
    {
        throw ServerException( "%s: listen socket failed", __PRETTY_FUNCTION__ );
    }

    FD_ZERO( &FileDescriptorServer );
    FD_SET( SocketHandle, &FileDescriptorServer );

    std::cout << "socket server fd '" << SocketHandle << "' listening on " << Host << ":" << Port << std::endl;
}

void ServerSocket::Run( void )
{
    std::cout << __PRETTY_FUNCTION__  << std::endl;

    do
    {
        try
        {
            if ( !clientSockets->HandleConnnections() )
            {
                continue;
            }

            clientSockets->Read();
        }
        catch( ServerException const &e )
        {
            int ErrorCode = WSAGetLastError();

            std::cout << "error while server running: " << e.what() << ", code: " << ErrorCode << std::endl;

            if ( ErrorCode == 10038 ) // no socket error
            {
                // continue; // not sure if this is the correct way to do...
            }

            // if ( ErrorCode == 10014 ) {} // bad address error
            // if ( ErrorCode == 10054 ) {} // connection reset by peer

            break;
        }
        catch( ... )
        {
            std::cout << "some exception occurred" << std::endl;
        }
    } while ( true );
}

ClientSockets::ClientSockets( ServerSocket * AOwnerServer )
{
    OwnerServer = AOwnerServer;
}

void ClientSockets::Append( ClientSocket *AClientSocket )
{
    Items.insert( std::make_pair( AClientSocket->GetSocketHandle(), AClientSocket ) );
}

int ClientSockets::Count( void )
{
    return Items.size();
}

bool ClientSockets::HandleConnnections( void )
{
    FD_ZERO( & FileDescriptorClient );

    std::unordered_map< SOCKET, Socket *> SocketsRead;
    SocketsRead.insert( std::make_pair( OwnerServer->GetSocketHandle(), OwnerServer ) );

    for ( auto client : Items )
    {
        SocketsRead.insert( client );
    }

    for ( auto Socket : SocketsRead )
    {
        if ( FD_ISSET( Socket.first, & OwnerServer->FileDescriptorServer ) )
        {
            FD_SET( Socket.first, & FileDescriptorClient );
        }
    }

    struct timeval tv;
    tv.tv_sec = 0;
    tv.tv_usec = 3;

    int SocketChangedCount = 0;

    if ( ( SocketChangedCount = select( 0, &FileDescriptorClient, NULL, NULL, &tv ) ) == -1 )
    {
        throw ServerException( "%s: select failed", __PRETTY_FUNCTION__ );
    }

    if ( FD_ISSET( OwnerServer->GetSocketHandle(), &FileDescriptorClient ) )
    {
        struct sockaddr_in ClientAddress;

        socklen_t Length = sizeof( ClientAddress );

        SOCKET SocketHandle = 0;

        // change N° 1:
        // the return value of select is the number of new connections.
        // therefore we need to call accept for EACH new connection.
        // this issue did never light up, because the connections have
        // to be established almost simultaneously to trigger this issue.

        for ( int i = 0; i < SocketChangedCount; ++i )
        {
            if ( ( SocketHandle = accept( OwnerServer->GetSocketHandle(), (struct sockaddr*) &ClientAddress, &Length ) ) == INVALID_SOCKET )
            {
                throw ServerException( "%s: accept failed", __PRETTY_FUNCTION__ );
            }
            else
            {
                OnClientConnect( SocketHandle, ClientAddress );
            }

            FD_SET( SocketHandle, &OwnerServer->FileDescriptorServer );
        }
    }

    return ( SocketChangedCount > 0 );
}

void ClientSockets::OnClientConnect( SOCKET ASocketHandle, struct sockaddr_in AClientAddress )
{
#if LOG_CONNECTION_AMOUNT > 0
    static int ClientConnectionCounter = 0;

    if ( ClientConnectionCounter++ % LOG_CONNECTION_AMOUNT == 0 )
    {
        std::cout << "connect     fd: [" << ASocketHandle << "], serial: [" << ClientConnectionCounter << "], cnt: [" << Count() << "]" << std::endl;
    }
#endif

    ClientSocket *clientSocket = new ClientSocket( this, ASocketHandle );

    Append( clientSocket );
}

void ClientSockets::Read( void )
{
    for ( auto clientSocket : Items )
    {
        if ( FD_ISSET( clientSocket.first, &FileDescriptorClient ) )
        {
            char buf[ SOCKETBUFLEN + 1 ];
            memset( buf, 0x0, sizeof( buf ) );

            int BytesReceived = recv( clientSocket.first, buf, SOCKETBUFLEN, 0 );

            if ( BytesReceived > 0 )
            {
                std::string Message = ( std::string ) buf;

                if ( Message.substr( 0, 4 ) == "quit" )
                {
                    clientSocket.second->Close();
                    continue;
                    // break;
                }
                else
                {
                    // ... do fancy stuff here...
                    std::cout << "read from socket: " << Message << std::endl;
                }
            }
            else if ( BytesReceived == 0 )
            {
                clientSocket.second->Close();
                // change N° 4:
                // breaking the loop is a really bad idea
                continue;
                // break;
            }
            else
            {
                throw ServerException( "reading data failed" );
            }
        }
    }
}



void ClientSockets::Remove( SOCKET ASocketHandle )
{
    FD_CLR( ASocketHandle, & FileDescriptorClient );

/*
    // whatever this code did: it was wrong
    std::vector< ClientSocket * >::iterator it =
        Items.erase(
            std::remove_if(
                Items.begin(),
                Items.end(),
                [ ASocketHandle ]( Socket *sock ){ return sock->GetSocketHandle() == ASocketHandle; }
            )
        );

    delete *it;
*/

    std::unordered_map< SOCKET, ClientSocket * >::iterator it = Items.find( ASocketHandle );

    if ( it != Items.end() )
    {
        Items.erase( it );
    }
}


void ClientSocket::Close( void )
{
#if LOG_CONNECTION_AMOUNT > 0
    static int ClientDisconnectionCounter = 0;

    if ( ClientDisconnectionCounter++ % LOG_CONNECTION_AMOUNT == 0 )
    {
        std::cout << "disconnnect fd: [" << SocketHandle << "], serial: [" << ClientDisconnectionCounter << "], cnt: [" << FClientSockets->Count() << "]" << std::endl;
    }
#endif

    if ( closesocket( SocketHandle ) == SOCKET_ERROR )
    {
        throw ClientException( "closing socket failed: %ld", errno );
    }

    FClientSockets->Remove( SocketHandle );
}

ClientSocket::ClientSocket( ClientSockets *AClientSockets, SOCKET ASocketHandle ) :
    Socket( ASocketHandle ),
    FClientSockets( AClientSockets ) {}

Main.cpp остается свободным. Большое спасибо!

0 голосов
/ 27 июня 2018

Я не совсем уверен, что не так с вашим кодом - для меня слишком сложно следовать должным образом, поэтому вместо этого я разместил собственный код, который работает правильно, который вы можете использовать в качестве руководства.

Этот код отлично работает, используя Sleep(1) в клиенте (хотя это не совсем то, что вы, вероятно, думаете, что оно делает). Ниже приведены некоторые примечания.

#define FD_SETSIZE      4096

#include <WinSock2.h>                   // ** before** windows.h
#include <WS2tcpip.h>
#include <windows.h>

#include <iostream>
#include <set>
#include <assert.h>

#pragma comment (lib, "ws2_32.lib")

const int port = 24442;
std::set <SOCKET> connected_sockets;

// main
int main (char argc, char* argv[])
{
    WSADATA wsadata;
    WORD version = MAKEWORD(2, 2);

    int err = WSAStartup (MAKEWORD (2, 2), &wsadata);
    if (err)
    {
        std::cout << "WSAStartup failed, error: " << err << std::endl;
        return 255;
    }

    char buf [512];
    bool is_client = argc > 1 && _stricmp (argv [1], "client") == 0;

    if (is_client)
    {
        int lap = 0;
        for ( ; ; )
        {
            // Client
            SOCKET skt = socket (AF_INET, SOCK_STREAM, 0);
            assert (skt != INVALID_SOCKET);

            sockaddr_in server_address = { };
            server_address.sin_family = AF_INET;
            server_address.sin_port = htons (port);
            inet_pton (AF_INET, "192.168.1.2", &server_address.sin_addr);

            std::cout << ++lap << ": Connecting..." << std::endl;
            int err = connect (skt, (const sockaddr *) &server_address, sizeof (server_address));

            if (err)
            {
                std::cout << "connect() failed, error: " << WSAGetLastError () << std::endl;
                Sleep (50);
                continue;
            }

            memset (buf, 'Q', sizeof (buf));
            std::cout << "Sending..." << std::endl;

            if (send (skt, buf, sizeof (buf), 0) == SOCKET_ERROR)
                std::cout << "send() failed, error: " << WSAGetLastError () << std::endl;

            std::cout << "Disconnecting..." << std::endl;
            closesocket (skt);
            Sleep (1);
        }

        WSACleanup ();
        return 0;
    }

    // Server
    SOCKET listener_skt = socket (AF_INET, SOCK_STREAM, 0);
    assert (listener_skt != INVALID_SOCKET);
    sockaddr_in receive_address = { };
    receive_address.sin_family = AF_INET;
    receive_address.sin_port = htons (port);
    receive_address.sin_addr.s_addr = htonl (INADDR_ANY);

    if (bind (listener_skt, (const sockaddr *) &receive_address, sizeof (receive_address)) == -1)
    {
        std::cout << "bind failed , error: " << errno << std::endl;
        return 255;
    }

    if (listen (listener_skt, 256) == SOCKET_ERROR)
    {
        std::cout << "listen() failed, error: " << WSAGetLastError () << std::endl;
        return 255;
    }

    std::cout << "Listening..." << std::endl;

    for ( ; ; )
    {
        fd_set read_fds;
        FD_ZERO (&read_fds);
        FD_SET (listener_skt, &read_fds);    // listener_skt must be first

        int n_fds = 0;
        for (auto skt : connected_sockets)
        {
            FD_SET (skt, &read_fds);
            if (++n_fds >= FD_SETSIZE - 1)
                break;
        }

        n_fds = select (0, &read_fds, NULL, NULL, NULL);
        if (n_fds <= 0)
        {
            if (n_fds < 0)
                std::cout << "select failed, error: " << WSAGetLastError () << std::endl;
            continue;
        }

        int i = 0;
        if (read_fds.fd_array [i] == listener_skt)
        {
            SOCKET skt = accept (listener_skt, NULL, 0);
            if (skt == SOCKET_ERROR)
                std::cout << "accept() failed, error: " << WSAGetLastError () << std::endl;
            else
            {
                std::cout << "Accepted connection on socket: " << skt << std::endl;
                connected_sockets.insert (skt);
            }
            ++i;
        }

        while (i < n_fds)
        {
            SOCKET skt = read_fds.fd_array [i];
            int nbytes = recv (skt, buf, sizeof (buf), 0);
            if (nbytes > 0)
                std::cout << "Received " << nbytes << " bytes on socket " << skt << std::endl;
            else
            {
                std::cout << "Socket " << skt << " disconnected, code: " << nbytes << std::endl;
                closesocket (skt);
                connected_sockets.erase (skt);
            }
            ++i;
        }
    }

    closesocket (listener_skt);
    WSACleanup ();
    return 0;
}

Использование:

test_program - для запуска в качестве сервера

test_program client - для запуска в качестве клиента

Примечания:

  1. Вы должны # определить (и затем уважать) FD_SETSIZE, прежде чем #include WinSock2.h. Ваш код этого не делает (= потенциальная перезапись памяти, которая может быть одной из причин, вызывающих у вас проблемы).
  2. Ваш параметр backlog для listen() слишком мал и приведет к тому, что клиентам сообщат, что они не могут подключиться, когда сервер занят (см. здесь ). Я выбрал 256, но значение, которое вы выбираете, должно зависеть от вашего варианта использования и результатов теста.
  3. Лучшим стресс-тестом (который является одним из заданных вами вопросов) является одновременное подключение нескольких клиентов к серверу с запросами на соединение с (я бы сказал) более реалистичным временем Sleep в клиенте. Я пробовал 5 клиентов с Sleep(50), и он работал без перебоев 15 минут или более.
  4. A std::set - намного лучший способ для сервера отслеживать свои подключенные SOCKET s, чем vector, см. Код. Это очень много для того, что std::set было разработано.

Надеюсь, это укажет путь. Дайте мне знать, если у вас есть какие-либо вопросы (мне нравится писать сетевой код:).

...