Входящие соединения сокетов не могут одновременно передавать элементы push_back глобально определенному std :: vector - PullRequest
1 голос
/ 20 июня 2019

Я новичок в программировании сокетов, и в этот момент я столкнулся с проблемой, которую не могу решить. Я читал из нескольких источников, что контейнеры стандартного шаблона (STL) C ++ не являются поточно-ориентированными , поэтому один программист должен навязать механизм, который гарантирует, что несколько потоков не изменяют данные контейнер одновременно.

Например, Безопасность потока std :: vector push_back и reserve

Я использовал класс std::mutex, чтобы при программировании threads никто не записывал данные в один и тот же контейнер одновременно. Тем не менее, это не работает для меня, когда я использую sockets.

Предположим, у меня есть 4 клиента, каждый из которых отправляет данные (int) на сервер в следующем порядке:

client_0: 4
client_1: 8
client_2: 5
client_4: 7

Обратите внимание на следующий код для простого сервера:

#define PORT 60000

#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <vector>
#include <string>
#include <iostream>
#include <mutex>

using namespace std;

vector<int> inputQueue; //<--------!
mutex mtx; //<---------------------!

void printVector(vector<int> input) {
    cout << "inputQueue: [";
    for (unsigned int i = 0; i < input.size(); i++ ) {
        if (i != input.size() - 1)
            cout << input[i] << ", ";
        else
            cout << input[i];
    }
    cout << "]." << endl;
}

int main(int argc, char const *argv[])
{
    int server_fd, client_fd;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);

    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons( PORT );
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address))<0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }
    if (listen(server_fd, 10) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }
    while(1) {
        char buffer[4];
        if ((client_fd = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen))<0) {
            perror("accept");
            exit(EXIT_FAILURE);
        }
        if (!fork()) {
            recv(client_fd, buffer, 4, MSG_WAITALL);
            int receivedInt = int(
                    (unsigned char)(buffer[0]) << 24 |
                    (unsigned char)(buffer[1]) << 16 |
                    (unsigned char)(buffer[2]) << 8 |
                    (unsigned char)(buffer[3])
            );
            mtx.lock(); //<-------------------------------------!
            inputQueue.push_back(receivedInt); //<--------------!
            cout << "Client context. Integer registered: " << receivedInt << ": inputQueue length is " << inputQueue.size() << endl;
            printVector(inputQueue); //<------------------------!
            mtx.unlock(); //<-----------------------------------!
            close(server_fd); close(client_fd);
        }
        cout << "Server context: inputQueue length is " << inputQueue.size() << endl;
        printVector(inputQueue);
    }
    return 0;
}

Сервер должен получать данные, удостоверяясь, что они делают это в том же порядке, и регистрируя свои соответствующие данные в векторе целых чисел, то есть std::vector<int> inputQueue, используя метод push_back(), чтобы inputQueue = {4, 8, 5, 7} в окончание приема всех данных клиентами.

Я должен уточнить, что inputQueue - это глобальная переменная, которая при запуске сервера не содержит элементов, но они добавляются при регистрации клиентов.

Проблема в том, что ни один из клиентов не регистрирует элементы в inputQueue. Обратите внимание, что в следующем коде, в зависимости от того, куда вы поместили инструкцию cout <<, вы можете увидеть, что размер inputQueue отличается. Это показывает, что в контексте клиента каждый клиент перезаписывает первый элемент inputQueue, но вне его никто из клиентов не может зарегистрировать один элемент в inputQueue.

По-видимому, у каждого сокета есть своя собственная копия inputQueue, поэтому при уничтожении измененная копия inputQueue также уничтожается.

Вывод следующий:

Server context: inputQueue length is 0
inputQueue: [].
Client context. Integer registered: 4: inputQueue length is 1
inputQueue: [4].
Server context: inputQueue length is 1
inputQueue: [4].
Server context: inputQueue length is 0
inputQueue: [].
Client context. Integer registered: 8: inputQueue length is 1
inputQueue: [8].
Server context: inputQueue length is 0
inputQueue: [].
Server context: inputQueue length is 1
inputQueue: [8].
Client context. Integer registered: 5: inputQueue length is 1
inputQueue: [5].
Server context: inputQueue length is 1
inputQueue: [5].
Server context: inputQueue length is 0
inputQueue: [].
Client context. Integer registered: 7: inputQueue length is 1
inputQueue: [7].
Server context: inputQueue length is 1
inputQueue: [7].

Кто-нибудь знает, почему это происходит и как они могут это решить? Я надеюсь, что вы можете мне помочь. Спасибо

Ответы [ 2 ]

3 голосов
/ 20 июня 2019
if (!fork()) {

fork() создает совершенно новый, независимый процесс с собственным адресным пространством виртуальной памяти. Показанный код, очевидно, ожидает, что и дочерний процесс, и исходный процесс будут взаимодействовать через один и тот же объект, а именно вектор, заблокированный мьютексом.

Это не то, что происходит. Теперь у вас есть два совершенно независимых процесса. Это ничем не отличается от запуска вашей программы дважды, одновременно или по очереди. Ожидаете ли вы, что обе запущенные копии вашей программы будут каким-то образом использовать один и тот же вектор и мьютекс? Конечно нет.

Вместо этого вы хотите использовать std::thread для создания нового потока выполнения в том же процессе. Ваша книга C ++ должна иметь больше информации о том, как создавать новые потоки выполнения с std::thread.

Более того, даже если вы замените fork() аналогичным потоком выполнения: это все равно не решит все проблемы здесь. Вам также необходимо правильно обработать синхронизация между несколькими потоками выполнения. В частности: нет никаких гарантий, что новый поток выполнения вставит что-то в вектор, прежде чем другой поток выполнения попытается printVector его содержимое. Новый поток выполнения может сделать это до того, как исходный поток выполнения введет printVector. Или нет, и printVector находит полностью пустой вектор, потому что другому потоку выполнения не удалось вставить что-то в него достаточно быстро. Теперь у вас есть два полностью независимых потока выполнения, запущенных одновременно, и у вас нет никаких гарантий относительно того, какой поток что делает первым.

Вы можете даже получать разные результаты каждый раз, когда запускаете многопоточную версию показанной программы (и вы, вероятно, будете).

Когда вы будете готовы приступить к решению этой новой проблемы, ваша книга по C ++ объяснит, как использовать условные переменные вместе с мьютексами для правильной реализации многопоточной синхронизации. К сожалению, эта тема не может быть полностью рассмотрена в кратком ответе на stackoverflow.com, но в вашей книге по C ++ должно быть несколько специальных глав, где вы найдете больше информации.

P.S. Единственная причина, по которой ваши выходные данные показывают что-либо во входной очереди, заключается в том, что ничто не может остановить дочерний процесс, который продолжит выполнение программы, когда он завершит выполнение оператора if и сам завершится вызовом printVector. Это не от родительского процесса. Каждый дочерний процесс завершает печать значения, которое он сам вставил в свой собственный вектор.

2 голосов
/ 20 июня 2019

Как отмечает Майлз Буднек, вы создаете новый дочерний процесс. Сокеты являются глобальными объектами ОС, поэтому работают как положено. Ваш вектор и память, в которой он хранится, являются локальными для процесса и поэтому не могут быть доступны для вашего нового процесса.

Рассмотрим заглянуть в std :: thread: https://en.cppreference.com/w/cpp/thread/thread

Один из наиболее часто используемых методов запуска потока - лямбда.

#include <thread>
#include <iostream>

auto print_number(int number) -> void
{
    std::cout << number << std::endl; // This runs in the new thread.
}

int main()
{
    int num = 12;
    auto t = std::thread([num](){print_number(num);}); // Spawn new thread that calls the lambda

    t.join(); // Wait for thread to finish execution
    return 0;
}
...