Как правильно использовать std :: mutex в C ++ без тупиков и / или гонок? - PullRequest
0 голосов
/ 25 октября 2018

Я пытаюсь отладить программу, которую пытаюсь запустить параллельно.Я в недоумении, почему у меня возникают взаимоблокировки и условия гонки, когда я пытаюсь скомпилировать и запустить код на C ++.Вот весь соответствующий код, который я написал до сих пор.

// define job struct here

// define mutex, condition variable, deque, and atomic here
std::deque<job> jobList;
std::mutex jobMutex;
std::condition_variable jobCondition;
std::atomic<int> numberThreadsRunning;

void addJobs(...insert parameters here...)
{
        job current = {...insert parameters here...};
        jobMutex.lock();
        std::cout << "We have successfully acquired the mutex." << std::endl;
        jobList.push_back(current);
        jobCondition.notify_one();
        jobMutex.unlock();
        std::cout << "We have successfully unlocked the mutex." << std::endl;
}

void work(void) {
        job* current;
        numberThreadsRunning++;
        while (true) {
                std::unique_lock<std::mutex> lock(jobMutex);
                if (jobList.empty()) {
                        numberThreadsRunning--;
                        jobCondition.wait(lock);
                        numberThreadsRunning++;
                }
                current = &jobList.at(0);
                jobList.pop_front();
                jobMutex.unlock();

                std::cout << "We are now going to start a job." << std::endl;

                ////Call an expensive function for the current job that we want to run in parallel.
                ////This could either complete the job, or spawn more jobs, by calling addJobs. 
                ////This recursive behavior typically results in there being thousands of jobs.

                std::cout << "We have successfully completed a job." << std::endl;
        }
        numberThreadsRunning--;
        std::cout << "There are now " << numberThreadsRunning << " threads running." << std::endl;
}

int main( int argc, char *argv[] ) {
        //Initialize everything and add first job to the deque.
        std::thread jobThreads[n]

        for (int i = 0; i < n; i++) {
                jobThreads[i] = std::thread(work);
        }
        for (int i = 0; i < n; i++) {
                jobThreads[i].join();
        }
 }

Код компилируется, но в зависимости от случайных факторов он либо будет тупиковым в самом конце, либо будет иметь ошибку сегментации в середине, покаочередь еще довольно большая.Кто-нибудь знает больше о том, почему это происходит?

...

РЕДАКТИРОВАТЬ: я отредактировал этот вопрос, чтобы включить дополнительную информацию и более полный пример.Хотя я, конечно, не хочу утомлять вас тысячами строк кода, которые у меня есть (пакет рендеринга изображений), я считаю, что этот пример лучше отражает тип проблемы, с которой я сталкиваюсь.Пример, приведенный в ответе Алана Биртлза, работает только с очень простой структурой работы с очень простой функциональностью.В реальной структуре задания есть несколько указателей на разные векторы и матрицы, и поэтому нам нужны указатели на структуру задания, иначе компилятор не сможет скомпилироваться, потому что функция конструктора была «неявно удалена».

IПоверьте, ошибка, с которой я сталкиваюсь, связана с тем, как я блокирую и разблокирую потоки.Я знаю, что указатели также вызывают некоторые проблемы, но они, вероятно, должны остаться.Функция thisFunction() представляет функцию, которую необходимо запустить параллельно.

#include <queue>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <iostream>
#include <cmath>

struct job {
        std::vector<std::vector<int>>   &matrix;
        int num;
};
bool closed = false;

std::deque<job> jobList;
std::mutex jobMutex;
std::condition_variable jobCondition;
std::atomic<int> numberThreadsRunning;
std::atomic<int> numJobs;

struct tcout
{
    tcout() :lock(mutex) {}

    template < typename T >
    tcout& operator<< (T&& t)
    {
        std::cout << t;
        return *this;
    }

    static std::mutex mutex;
    std::unique_lock< std::mutex > lock;
};
std::mutex tcout::mutex;


std::vector<std::vector<int>> multiply4x4(
                std::vector<std::vector<int>> &A,
                std::vector<std::vector<int>> &B) {
        //Only deals with 4x4 matrices
        std::vector<std::vector<int>> C(4, std::vector<int>(4, 0));
        for (int i = 0; i < 4; i++) {
                for (int j = 0; j < 4; j++) {
                        for (int k = 0; k < 4; k++) {
                                C.at(i).at(j) = C.at(i).at(j) + A.at(i).at(k) * B.at(k).at(j);
                        }
                }
        }

        return C;
}

void addJobs()
{
    numJobs++;
    std::vector<std::vector<int>> matrix(4, std::vector<int>(4, -1)); //Create random 4x4 matrix
    for (int i = 0; i < 4; i++) {
            for (int j = 0; j < 4; j++) {
                    matrix.at(i).at(j) = rand() % 10 + 1;
            }
    }
    job current = { matrix, numJobs };
    std::unique_lock<std::mutex> lock(jobMutex);

    std::cout << "The matrix for job " << current.num << " is: \n";
    for (int i = 0; i < 4; i++) {
            for (int j = 0; j < 4; j++) {
                    std::cout << matrix.at(i).at(j) << "\t";
            }
            std::cout << "\n";
    }

    jobList.push_back(current);
    jobCondition.notify_one();
    lock.unlock();
}

void thisFunction(std::vector<std::vector<int>> &matrix, int num)
{
        std::this_thread::sleep_for(std::chrono::milliseconds(rand() * 500 / RAND_MAX));
        std::vector<std::vector<int>> product = matrix;

        std::unique_lock<std::mutex> lk(jobMutex);
        std::cout << "The imported matrix for job " << num << " is: \n";
        for (int i = 0; i < 4; i++) {
                for (int j = 0; j < 4; j++) {
                        std::cout << product.at(i).at(j) << "\t";
                }
                std::cout << "\n";
        }
        lk.unlock();

        int power;
        if (num % 2 == 1) {
                power = 3;
        } else if (num % 2 == 0) {
                power = 2;
                addJobs();
        }
        for (int k = 1; k < power; k++) {
                product = multiply4x4(product, matrix);
        }

        std::unique_lock<std::mutex> lock(jobMutex);
        std::cout << "The matrix for job " << num << " to the power of " << power << " is: \n";
        for (int i = 0; i < 4; i++) {
                for (int j = 0; j < 4; j++) {
                        std::cout << product.at(i).at(j) << "\t";
                }
                std::cout << "\n";
        }
        lock.unlock();

}

void work(void) {
    job *current;
    numberThreadsRunning++;
    while (true) {
        std::unique_lock<std::mutex> lock(jobMutex);
        if (jobList.empty()) {
            numberThreadsRunning--;
            jobCondition.wait(lock, [] {return !jobList.empty() || closed; });
            numberThreadsRunning++;
        }
        if (jobList.empty())
        {
            break;
        }
        current = &jobList.front();
        job newcurrent = {current->matrix, current->num};
        current = &newcurrent;
        jobList.pop_front();
        lock.unlock();
        thisFunction(current->matrix, current->num);
        tcout() << "job " << current->num << " complete\n";
    }
    numberThreadsRunning--;
}


int main(int argc, char *argv[]) {
    const size_t n = 1;
    numJobs = 0;
    std::thread jobThreads[n];
    std::vector<int> buffer;

    for (int i = 0; i < n; i++) {
        jobThreads[i] = std::thread(work);
    }
    for (int i = 0; i < 100; i++)
    {
        addJobs();
    }
    {
        std::unique_lock<std::mutex> lock(jobMutex);
        closed = true;
        jobCondition.notify_all();
    }
    for (int i = 0; i < n; i++) {
        jobThreads[i].join();
    }
}

Ответы [ 2 ]

0 голосов
/ 25 октября 2018

Вот полностью рабочий пример:

#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <iostream>

struct job { int num; };
bool closed = false;

std::deque<job> jobList;
std::mutex jobMutex;
std::condition_variable jobCondition;
std::atomic<int> numberThreadsRunning;

struct tcout
{
    tcout() :lock(mutex) {}

    template < typename T >
    tcout& operator<< (T&& t)
    {
        std::cout << t;
        return *this;
    }

    static std::mutex mutex;
    std::unique_lock< std::mutex > lock;
};
std::mutex tcout::mutex;

void addJobs()
{
    static int num = 0;
    job current = { num++ };
    std::unique_lock<std::mutex> lock(jobMutex);
    jobList.push_back(current);
    jobCondition.notify_one();
    lock.unlock();
}

void work(void) {
    job current;
    numberThreadsRunning++;
    while (true) {
        std::unique_lock<std::mutex> lock(jobMutex);
        if (jobList.empty()) {
            numberThreadsRunning--;
            jobCondition.wait(lock, [] {return !jobList.empty() || closed; });
            numberThreadsRunning++;
        }
        if (jobList.empty())
        {
            break;
        }
        current = jobList.front();
        jobList.pop_front();
        lock.unlock();
        std::this_thread::sleep_for(std::chrono::milliseconds(rand() * 500 / RAND_MAX));
        tcout() << "job " << current.num << " complete\n";
    }
    numberThreadsRunning--;
}

int main(int argc, char *argv[]) {
    const size_t n = 4;
    std::thread jobThreads[n];

    for (int i = 0; i < n; i++) {
        jobThreads[i] = std::thread(work);
    }
    for (int i = 0; i < 100; i++)
    {
        addJobs();
    }
    {
        std::unique_lock<std::mutex> lock(jobMutex);
        closed = true;
        jobCondition.notify_all();
    }
    for (int i = 0; i < n; i++) {
        jobThreads[i].join();
    }
}

Я внес следующие изменения:

  1. Никогда не звоните lock() или unlock() на std::mutex, всегда используйте std::unique_lock (или похожие классы).Вы вызывали jobMutex.unlock() в work(), так как мьютекс, который вы заблокировали с помощью std::unique_lock, std::unique_lock во второй раз вызвал бы unlock, что привело бы к неопределенному поведению.Если в addJobs было сгенерировано исключение, то, поскольку вы вообще не использовали std::unique_lock, мьютекс останется заблокированным.
  2. Вам необходимо использовать предикат для jobCondition.wait, иначе ложное пробуждение может вызватьдождитесь возврата, пока jobList еще пусто.
  3. Я добавил переменную closed, чтобы заставить программу завершиться, когда больше нет работы
  4. Я добавил определениеиз job
  5. В work вы берете указатель на элемент в очереди, а затем извлекаете его из очереди, так как элемент больше не существует, указатель свисает.Вам нужно скопировать элемент, прежде чем выскочить в очередь.Если вы хотите избежать копирования, либо сделайте вашу структуру job подвижной, либо измените очередь на хранилище std::unique_ptr<job> или std::shared_ptr<job>
  6. Я также добавил поточно-ориентированную версию std::cout, это нене является строго необходимым, но останавливает ваши выходные строки перекрывая друг друга.В идеале вам следует использовать надлежащую многопотоковую библиотеку журналов, поскольку блокировка мьютекса для каждого отпечатка обходится дорого, и если у вас достаточно отпечатков, ваша программа практически однопоточна
0 голосов
/ 25 октября 2018

Замените job* current; на job current;, а затем current = jobList.at(0);.В противном случае вы получите указатель на элемент jobList, который не существует после jobList.pop_front().

Заменить if (jobList.empty()) на while(jobList.empty()) для обработки ложных пробуждений.

...