Recv () возвращает SOCKET_ERROR, когда я подключаю клиента к серверу вместо блокировки и ожидания сообщения - PullRequest
0 голосов
/ 13 января 2019

Я относительно новичок в сетевом программировании и многопоточности в C ++. В настоящее время мой вызов recv () возвращает неизвестную ошибку. Я не совсем уверен, откуда возникла ошибка в данный момент, и был бы признателен за помощь.

Я использовал putty для локального подключения к серверу

class Threaded_TCPListener{

    int Threaded_TCPListener::Init()
    {
        // Initializing WinSock
        WSADATA wsData;
        WORD ver = MAKEWORD(2,2);

        int winSock = WSAStartup(ver, &wsData);
        if(winSock != 0)
            return winSock;

        // Creating listening socket
        this->socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

        if(this->socket == INVALID_SOCKET)
            return WSAGetLastError();

        // Fill sockaddr with ip addr and port
        sockaddr_in hint;
        hint.sin_family = AF_INET;
        hint.sin_port = htons(this->port);
        inet_pton(AF_INET, this->ipAddress, &hint.sin_addr);

        // Bind hint to socket
        if(bind(this->socket, (sockaddr*)&hint, sizeof(hint)) == SOCKET_ERROR)
            return WSAGetLastError();

        // Start listening on socket
        if(listen(this->socket, SOMAXCONN) == SOCKET_ERROR)
            return WSAGetLastError();

        // Accept first client
        this->createAcceptThread();

        return 0;
    }

    int Threaded_TCPListener::Run()
    {
        bool isRunning = true;

        // Read from all clients
        std::vector<std::thread> threads;
        threads.reserve(this->clients.size());

        // Recv from client sockets
        for (int i=0; i < this->clients.size(); ++i)
        {
            threads.emplace_back(std::thread(&Threaded_TCPListener::receiveFromSocket, this, socket));
        }

        // Wait for all threads to finish
        for(std::thread& t : threads)
        {
            t.detach();
        }

        return 0;
    }

    void Threaded_TCPListener::onMessageReceived(int clientSocket, const char* msg, int length)
    {
        Threaded_TCPListener::broadcastToClients(clientSocket, msg, length);

        std::thread t(&Threaded_TCPListener::receiveFromSocket, this, clientSocket);

        t.detach();

        return;
    }

    void Threaded_TCPListener::sendMessageToClient(int clientSocket, const char * msg, int length)
    {
        send(clientSocket, msg, length, 0);

        return;
    }

    void Threaded_TCPListener::broadcastToClients(int senderSocket, const char * msg, int length)
    {
        std::vector<std::thread> threads;
        threads.reserve(clients.size());

        // Iterate over all clients
        for (int sendSock : this->clients)
        {
            if(sendSock != senderSocket)
                threads.emplace_back(std::thread(&Threaded_TCPListener::sendMessageToClient, this,sendSock, msg, length));
        }

        // Wait for all threads to finish
        for(std::thread& t : threads)
            t.join();

        return;
    }

    void Threaded_TCPListener::createAcceptThread()
    {
        // Start accepting clients on a new thread
        this->listeningThread = std::thread(&Threaded_TCPListener::acceptClient, this);

        this->listeningThread.detach();

        return;
    }

    void Threaded_TCPListener::acceptClient()
    {
        int client = accept(this->socket, nullptr, nullptr);

        // Error
        if(client == INVALID_SOCKET)
        {
            std::printf("Accept Err: %d\n", WSAGetLastError());
        }
        // Add client to clients queue
        else
        {
            // Add client to queue
            this->clients.emplace(client);

            // Client Connect Confirmation
            onClientConnected(client); // Prints msg on server

            // Create another thread to accept more clients
            this->createAcceptThread();
        }

        return;
    }

    void Threaded_TCPListener::receiveFromSocket(int receivingSocket)
    {
        // Byte storage
        char buff[MAX_BUFF_SIZE];

        // Clear buff
        memset(buff, 0, sizeof(buff));

        // Receive msg
        int bytesRecvd = recv(receivingSocket, buff, MAX_BUFF_SIZE, 0);
        if(bytesRecvd <= 0)
        {
            char err_buff[1024];
            strerror_s(err_buff, bytesRecvd);

            std::cerr << err_buff;
            // Close client
            this->clients.erase(receivingSocket);
            closesocket(receivingSocket);

            onClientDisconnected(receivingSocket); // Prints msg on server
        }
        else
        {
            onMessageReceived(receivingSocket, buff, bytesRecvd);
        }
    }
}

Я пытаюсь создать многопоточный TCP-сервер, который будет обрабатывать входящие клиенты, имея постоянно работающий поток приема (прослушивание новых подключений) и ожидающий поток с блоком recv для каждого клиента, подключенного к серверу.

Ответы [ 2 ]

0 голосов
/ 13 января 2019
  1. Ваш Init выглядит хорошо:

    • создать сокет, связать его, прослушать его, начать принимать поток
  2. В вашей принимающей ветке acceptClient выглядит нормально:

    • распечатать сообщение
    • добавить сокет клиента в clients очередь
    • создать новую принимающую тему
  3. Ваш Run не имеет смысла:

    • создать один поток на элемент в clients для получения от прослушивания socket

Похоже, что вы создаете новый поток для каждого отдельного действия сокета. Это довольно расточительный дизайн. Как только поток завершен, он может снова заняться чем-то другим.

Таким образом, создание нового потока принятия в acceptClient - пустая трата времени, вы можете просто вернуться к началу ::accept следующего клиента. Вот так:

acceptClient() {
  while (alive) {
    int client = accept(socket, ...);
    createClientHandler(client);
  }
}

То, что, по-видимому, отсутствует, порождает новый клиентский поток для обслуживания клиентского сокета. В настоящее время вы делаете это в Run, но это до того, как любой из клиентов будет принят. И вы делаете это для неправильного сокета! Вместо этого вы должны порождать потоки receiveFromSocket в acceptClient, и , передавая им client socket. Так что это ошибка.

В вашем receiveFromSocket вам также не нужно создавать еще один поток для receiveFromSocket снова - просто вернитесь к началу.

Наибольшее беспокойство в связи с этим проектом «поток за действие» заключается в том, что вы создаете потоки отправителей для каждого входящего сообщения. Это означает, что у вас может быть несколько потоков отправителей, пытающихся набрать ::send в одном и том же сокете TCP. Это не очень безопасно.

Порядок вызовов, выполняемых в WSASend, также является порядком, в котором буферы передаются на транспортный уровень. WSASend не должен вызываться на одном и том же потоково-ориентированном сокете одновременно из разных потоков, поскольку некоторые провайдеры Winsock могут разделить большой запрос на отправку на несколько передач, и это может привести к непреднамеренному чередованию данных из нескольких одновременных запросов на отправку в одном и том же ориентированном на поток розетка.

https://docs.microsoft.com/en-us/windows/desktop/api/winsock2/nf-winsock2-wsasend

Аналогично, вместо того, чтобы создавать потоки в broadcastToClients, я предлагаю вам создать один постоянный поток отправителя на каждый сокет клиента в acceptClient (вместе с потоком receiveFromSocket внутри некоторого createClientHandler).

Для связи с потоками отправителя вы должны использовать очереди блокировки потоков. Каждый поток отправителя будет выглядеть так:

while (alive) {
  msg = queue.next_message();
  send(client_socket, msg);
}

Затем в полученном сообщении вы просто делаете:

for (client : clients) {
  client.queue.put_message(msg);
}

Итак, подведем итог: для обработки каждого клиента вам нужна такая структура:

struct Client {
  int client_socket;
  BlockingQueue queue;

  // optionally if you want to keep track of your threads
  // to be able to safely clean up
  std::thread recv_thread, send_thread;
};

Безопасная очистка - это совсем другая история.


Наконец, комментарий к этому комментарию в вашем коде:

        // Wait for all threads to finish
        for(std::thread& t : threads)
        {
            t.detach();
        }

Это почти противоположно тому, что std::thread::detach делает: https://en.cppreference.com/w/cpp/thread/thread/detach Это позволяет вам уничтожить объект потока, не дожидаясь окончания выполнения потока.

0 голосов
/ 13 января 2019

В коде неверное представление о том, как должен быть реализован TCP-сервер:

Похоже, вы предполагаете, что у вас может быть один дескриптор файла сокета сервера, который может обрабатывать все коммуникации. Это не вариант. У вас должен быть один выделенный дескриптор файла сокета, который просто используется для прослушивания и приема входящих соединений, а затем у вас есть один дополнительный файловый дескриптор для каждого существующего соединения.

В вашем коде я вижу, что вы вызываете receiveFromSocket() всегда с прослушивающим сокетом. Это не верно. Также неправильно вызывать receiveFromSocket() в цикле для всех клиентов.

Что вам скорее всего нужно сделать, это: - Иметь один выделенный поток, который вызывает accept () в цикле. Вызов производительности не вызывается при вызове accept () из нескольких потоков. - Один accept () возвращает новое соединение, которое вы создали, новый поток, который вызывает recv () в цикле. Затем он будет блокировать и ждать новых данных, как вы ожидаете в своем вопросе.

Вам также необходимо отказаться от привычки вызывать отдельные функции из новых потоков. Это не многопоточное программирование. Поток обычно содержит цикл. Все остальное обычно является недостатком дизайна.

Также обратите внимание, что многопоточное программирование все еще остается ракетостроением в 2019 году, особенно в C ++. Если вы не являетесь абсолютным экспертом, вы не сможете это сделать. Также обратите внимание, что абсолютные эксперты в многопоточном программировании будут стараться по возможности избегать многопоточного программирования. Многие, казалось бы, параллельные задачи, связанные с вводом / выводом, могут лучше обрабатываться системой, основанной на однопоточных событиях.

...