Я пытаюсь отладить программу, которую пытаюсь запустить параллельно.Я в недоумении, почему у меня возникают взаимоблокировки и условия гонки, когда я пытаюсь скомпилировать и запустить код на C ++.Вот весь соответствующий код, который я написал до сих пор.
// define job struct here
// define mutex, condition variable, deque, and atomic here
std::deque<job> jobList;
std::mutex jobMutex;
std::condition_variable jobCondition;
std::atomic<int> numberThreadsRunning;
void addJobs(...insert parameters here...)
{
job current = {...insert parameters here...};
jobMutex.lock();
std::cout << "We have successfully acquired the mutex." << std::endl;
jobList.push_back(current);
jobCondition.notify_one();
jobMutex.unlock();
std::cout << "We have successfully unlocked the mutex." << std::endl;
}
void work(void) {
job* current;
numberThreadsRunning++;
while (true) {
std::unique_lock<std::mutex> lock(jobMutex);
if (jobList.empty()) {
numberThreadsRunning--;
jobCondition.wait(lock);
numberThreadsRunning++;
}
current = &jobList.at(0);
jobList.pop_front();
jobMutex.unlock();
std::cout << "We are now going to start a job." << std::endl;
////Call an expensive function for the current job that we want to run in parallel.
////This could either complete the job, or spawn more jobs, by calling addJobs.
////This recursive behavior typically results in there being thousands of jobs.
std::cout << "We have successfully completed a job." << std::endl;
}
numberThreadsRunning--;
std::cout << "There are now " << numberThreadsRunning << " threads running." << std::endl;
}
int main( int argc, char *argv[] ) {
//Initialize everything and add first job to the deque.
std::thread jobThreads[n]
for (int i = 0; i < n; i++) {
jobThreads[i] = std::thread(work);
}
for (int i = 0; i < n; i++) {
jobThreads[i].join();
}
}
Код компилируется, но в зависимости от случайных факторов он либо будет тупиковым в самом конце, либо будет иметь ошибку сегментации в середине, покаочередь еще довольно большая.Кто-нибудь знает больше о том, почему это происходит?
...
РЕДАКТИРОВАТЬ: я отредактировал этот вопрос, чтобы включить дополнительную информацию и более полный пример.Хотя я, конечно, не хочу утомлять вас тысячами строк кода, которые у меня есть (пакет рендеринга изображений), я считаю, что этот пример лучше отражает тип проблемы, с которой я сталкиваюсь.Пример, приведенный в ответе Алана Биртлза, работает только с очень простой структурой работы с очень простой функциональностью.В реальной структуре задания есть несколько указателей на разные векторы и матрицы, и поэтому нам нужны указатели на структуру задания, иначе компилятор не сможет скомпилироваться, потому что функция конструктора была «неявно удалена».
IПоверьте, ошибка, с которой я сталкиваюсь, связана с тем, как я блокирую и разблокирую потоки.Я знаю, что указатели также вызывают некоторые проблемы, но они, вероятно, должны остаться.Функция thisFunction()
представляет функцию, которую необходимо запустить параллельно.
#include <queue>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <iostream>
#include <cmath>
struct job {
std::vector<std::vector<int>> &matrix;
int num;
};
bool closed = false;
std::deque<job> jobList;
std::mutex jobMutex;
std::condition_variable jobCondition;
std::atomic<int> numberThreadsRunning;
std::atomic<int> numJobs;
struct tcout
{
tcout() :lock(mutex) {}
template < typename T >
tcout& operator<< (T&& t)
{
std::cout << t;
return *this;
}
static std::mutex mutex;
std::unique_lock< std::mutex > lock;
};
std::mutex tcout::mutex;
std::vector<std::vector<int>> multiply4x4(
std::vector<std::vector<int>> &A,
std::vector<std::vector<int>> &B) {
//Only deals with 4x4 matrices
std::vector<std::vector<int>> C(4, std::vector<int>(4, 0));
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
for (int k = 0; k < 4; k++) {
C.at(i).at(j) = C.at(i).at(j) + A.at(i).at(k) * B.at(k).at(j);
}
}
}
return C;
}
void addJobs()
{
numJobs++;
std::vector<std::vector<int>> matrix(4, std::vector<int>(4, -1)); //Create random 4x4 matrix
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
matrix.at(i).at(j) = rand() % 10 + 1;
}
}
job current = { matrix, numJobs };
std::unique_lock<std::mutex> lock(jobMutex);
std::cout << "The matrix for job " << current.num << " is: \n";
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
std::cout << matrix.at(i).at(j) << "\t";
}
std::cout << "\n";
}
jobList.push_back(current);
jobCondition.notify_one();
lock.unlock();
}
void thisFunction(std::vector<std::vector<int>> &matrix, int num)
{
std::this_thread::sleep_for(std::chrono::milliseconds(rand() * 500 / RAND_MAX));
std::vector<std::vector<int>> product = matrix;
std::unique_lock<std::mutex> lk(jobMutex);
std::cout << "The imported matrix for job " << num << " is: \n";
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
std::cout << product.at(i).at(j) << "\t";
}
std::cout << "\n";
}
lk.unlock();
int power;
if (num % 2 == 1) {
power = 3;
} else if (num % 2 == 0) {
power = 2;
addJobs();
}
for (int k = 1; k < power; k++) {
product = multiply4x4(product, matrix);
}
std::unique_lock<std::mutex> lock(jobMutex);
std::cout << "The matrix for job " << num << " to the power of " << power << " is: \n";
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
std::cout << product.at(i).at(j) << "\t";
}
std::cout << "\n";
}
lock.unlock();
}
void work(void) {
job *current;
numberThreadsRunning++;
while (true) {
std::unique_lock<std::mutex> lock(jobMutex);
if (jobList.empty()) {
numberThreadsRunning--;
jobCondition.wait(lock, [] {return !jobList.empty() || closed; });
numberThreadsRunning++;
}
if (jobList.empty())
{
break;
}
current = &jobList.front();
job newcurrent = {current->matrix, current->num};
current = &newcurrent;
jobList.pop_front();
lock.unlock();
thisFunction(current->matrix, current->num);
tcout() << "job " << current->num << " complete\n";
}
numberThreadsRunning--;
}
int main(int argc, char *argv[]) {
const size_t n = 1;
numJobs = 0;
std::thread jobThreads[n];
std::vector<int> buffer;
for (int i = 0; i < n; i++) {
jobThreads[i] = std::thread(work);
}
for (int i = 0; i < 100; i++)
{
addJobs();
}
{
std::unique_lock<std::mutex> lock(jobMutex);
closed = true;
jobCondition.notify_all();
}
for (int i = 0; i < n; i++) {
jobThreads[i].join();
}
}