Я реализую пул потоков, в котором рабочие спят, когда работа не готова, а основной поток спит, когда рабочие заняты. Я заметил, что рабочие потоки продолжают работать после вызова wait()
, хотя основной поток не notify_all()
.
Вывод выглядит примерно так:
WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
......
Рабочая функция:
void TaskSystemParallelThreadPoolSleeping::waitFunc() {
std::unique_lock<std::mutex> lock(*this->mutex_);
while(true) {
this->num_wait++;
std::cout << "WORKER WAIT" << std::endl;
this->cond_->wait(lock,
std::bind(&TaskSystemParallelThreadPoolSleeping::wakeWorker, this));
std::cout << "WORKER AWAKENS!" << std::endl;
if (this->done_flag == true) {
this->mutex_->unlock();
break;
}
this->mutex_->unlock();
std::cout << "WORKER START" << std::endl;
while (true) {
this->mutex_->lock();
if (this->not_done == 0) { // ALL work done
if (this->total_work != 0) { // 1st time seen by workers
this->total_work = 0;
this->num_wait = 0;
std::cout << "WORKER WAKE MAIN" << std::endl;
this->mutex_->unlock();
this->cond_->notify_all();
}
this->mutex_->unlock();
break;
}
int total = this->total_work;
int id = this->work_counter;
if (id == total) { // NO work initiated or NO work left
this->mutex_->unlock();
continue;
}
++(this->work_counter); // increment counter
this->mutex_->unlock(); // Let others access counters to work
this->runnable->runTask(id, total); // do work
this->mutex_->lock();
--(this->not_done); // decrement counter after work done
this->mutex_->unlock();
}
std::cout << "WORKER DONE" << std::endl;
}
std::cout << "WORKER TERMINATE" << std::endl;
}
Основной поток:
void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {
//
// TODO: CS149 students will modify the implementation of this
// method in Part A. The implementation provided below runs all
// tasks sequentially on the calling thread.
// Set-up work
this->mutex_->lock();
std::cout << "MAIN SETUP" << std::endl;
this->runnable = runnable;
this->work_counter = 0;
this->not_done = num_total_tasks;
this->total_work = num_total_tasks;
// Tell workers there is work
std::cout << "MAIN POLLS READINESS" << std::endl;
while (this->num_wait < this->num_T) { // Check if all ready
this->mutex_->unlock();
this->mutex_->lock();
}
std::cout << "ALL WORKERS READY" << std::endl;
this->mutex_->unlock();
this->cond_->notify_all();
// Wait for workers to complete work
std::unique_lock<std::mutex> lock(*this->mutex_);
this->cond_->wait(lock,
std::bind(&TaskSystemParallelThreadPoolSleeping::wakeMain, this));
std::cout << "MAIN END" << std::endl;
}
Состояние для пробуждения рабочего:
bool TaskSystemParallelThreadPoolSleeping::wakeWorker() {
return (this->done_flag == true ||
(this->total_work != 0 && this->num_wait == this->num_T));
}
Состояние для пробуждения основного потока:
bool TaskSystemParallelThreadPoolSleeping::wakeMain() {
return this->total_work == 0;
}
Конструктор пула потоков:
TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads) {
//
// TODO: CS149 student implementations may decide to perform setup
// operations (such as thread pool construction) here.
// Implementations are free to add new class member variables
// (requiring changes to tasksys.h).
//
this->num_T = std::max(1, num_threads - 1);
this->threads = new std::thread[this->num_T];
this->mutex_ = new std::mutex();
this->cond_ = new std::condition_variable();
this->total_work = 0;
this->not_done = 0;
this->work_counter = 0;
this->num_wait = 0;
this->done_flag = {false};
for (int i = 0; i < this->num_T; i++) {
this->threads[i] = std::thread(&TaskSystemParallelThreadPoolSleeping::waitFunc, this);
}
}
Деструктор пула потоков:
TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
this->done_flag = true;
this->cond_->notify_all();
for (int i = 0; i < this->num_T; i++) {
this->threads[i].join();
}
delete this->mutex_;
delete[] this->threads;
delete this->cond_;
}
Я думаю, что начало должно быть:
WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
......
Т.е. рабочие должны просыпаться только после основного notify_all()
РЕДАКТИРОВАТЬ: Вот полный журнал. Кажется, что это пробуждение рабочих позже вызывает тупик, когда один из рабочих пробуждается сам и выполняет всю работу, устанавливая this->num_wait=0
и this->total_work=0
. Поэтому все темы видят только this->num_wait=1
.
WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
MAIN ENDWORKER DONE
WORKER WAIT
WORKER DONE
WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER AWAKENS!
WORKER START
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
WORKER DONEMAIN END
MAIN SETUP
MAIN POLLS READINESS
WORKER DONE
WORKER WAIT
WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONEWORKER DONE
WORKER WAIT
WORKER WAIT
WORKER DONE
WORKER WAIT
MAIN END
MAIN SETUP
MAIN POLLS READINESS
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
WORKER DONEWORKER DONE
WORKER WAIT
MAIN END
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
ALL WORKERS READY
MAIN END
MAIN SETUP
MAIN POLLS READINESS