Спящий поток: рабочий поток пробуждается без уведомления () из основного потока - PullRequest
0 голосов
/ 22 октября 2019

Я реализую пул потоков, в котором рабочие спят, когда работа не готова, а основной поток спит, когда рабочие заняты. Я заметил, что рабочие потоки продолжают работать после вызова wait(), хотя основной поток не notify_all().

Вывод выглядит примерно так:

WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
......

Рабочая функция:

void TaskSystemParallelThreadPoolSleeping::waitFunc() {
    std::unique_lock<std::mutex> lock(*this->mutex_);
    while(true) {
        this->num_wait++;
        std::cout << "WORKER WAIT" << std::endl;
        this->cond_->wait(lock,
                        std::bind(&TaskSystemParallelThreadPoolSleeping::wakeWorker, this));
        std::cout << "WORKER AWAKENS!" << std::endl;
        if (this->done_flag == true) {
            this->mutex_->unlock();
            break;
        }
        this->mutex_->unlock();

        std::cout << "WORKER START" << std::endl;
        while (true) {
            this->mutex_->lock();

            if (this->not_done == 0) {  // ALL work done
                if (this->total_work != 0) {  // 1st time seen by workers
                    this->total_work = 0;
                    this->num_wait = 0;
                    std::cout << "WORKER WAKE MAIN" << std::endl;
                    this->mutex_->unlock();
                    this->cond_->notify_all();
                }
                this->mutex_->unlock();
                break;
            }

            int total = this->total_work;
            int id = this->work_counter;
            if (id == total) {  // NO work initiated or NO work left
                this->mutex_->unlock();
                continue;
            }

            ++(this->work_counter);  // increment counter
            this->mutex_->unlock();  // Let others access counters to work

            this->runnable->runTask(id, total); // do work

            this->mutex_->lock();
            --(this->not_done); // decrement counter after work done
            this->mutex_->unlock();
        }
        std::cout << "WORKER DONE" << std::endl;
    }
    std::cout << "WORKER TERMINATE" << std::endl;
}

Основной поток:

void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {
    //
    // TODO: CS149 students will modify the implementation of this
    // method in Part A.  The implementation provided below runs all
    // tasks sequentially on the calling thread.

    // Set-up work
    this->mutex_->lock();
    std::cout << "MAIN SETUP" << std::endl;
    this->runnable = runnable;
    this->work_counter = 0;
    this->not_done = num_total_tasks;
    this->total_work = num_total_tasks;

    // Tell workers there is work
    std::cout << "MAIN POLLS READINESS" << std::endl;
    while (this->num_wait < this->num_T) {  // Check if all ready
        this->mutex_->unlock();
        this->mutex_->lock();
    }
    std::cout << "ALL WORKERS READY" << std::endl;
    this->mutex_->unlock();
    this->cond_->notify_all();

    // Wait for workers to complete work
    std::unique_lock<std::mutex> lock(*this->mutex_);
    this->cond_->wait(lock,
                    std::bind(&TaskSystemParallelThreadPoolSleeping::wakeMain, this));
    std::cout << "MAIN END" << std::endl;
}

Состояние для пробуждения рабочего:

bool TaskSystemParallelThreadPoolSleeping::wakeWorker() {
    return (this->done_flag == true ||
                    (this->total_work != 0 && this->num_wait == this->num_T));
}

Состояние для пробуждения основного потока:

bool TaskSystemParallelThreadPoolSleeping::wakeMain() {
    return this->total_work == 0;
}

Конструктор пула потоков:

TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads) {
    //
    // TODO: CS149 student implementations may decide to perform setup
    // operations (such as thread pool construction) here.
    // Implementations are free to add new class member variables
    // (requiring changes to tasksys.h).
    //
    this->num_T = std::max(1, num_threads - 1);
    this->threads = new std::thread[this->num_T];
    this->mutex_ = new std::mutex();
    this->cond_ = new std::condition_variable();

    this->total_work = 0;
    this->not_done = 0;
    this->work_counter = 0;
    this->num_wait = 0;
    this->done_flag = {false};

    for (int i = 0; i < this->num_T; i++) {
        this->threads[i] = std::thread(&TaskSystemParallelThreadPoolSleeping::waitFunc, this);
    }
}

Деструктор пула потоков:

TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
    this->done_flag = true;
    this->cond_->notify_all();

    for (int i = 0; i < this->num_T; i++) {
        this->threads[i].join();
    }

    delete this->mutex_;
    delete[] this->threads;
    delete this->cond_;
}

Я думаю, что начало должно быть:

WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
......

Т.е. рабочие должны просыпаться только после основного notify_all()

РЕДАКТИРОВАТЬ: Вот полный журнал. Кажется, что это пробуждение рабочих позже вызывает тупик, когда один из рабочих пробуждается сам и выполняет всю работу, устанавливая this->num_wait=0 и this->total_work=0. Поэтому все темы видят только this->num_wait=1.

WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
MAIN ENDWORKER DONE
WORKER WAIT
WORKER DONE
WORKER WAIT

MAIN SETUP
MAIN POLLS READINESS
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER AWAKENS!
WORKER START
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
WORKER DONEMAIN END
MAIN SETUP
MAIN POLLS READINESS
WORKER DONE
WORKER WAIT

WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONEWORKER DONE
WORKER WAIT

WORKER WAIT
WORKER DONE
WORKER WAIT
MAIN END
MAIN SETUP
MAIN POLLS READINESS
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
WORKER DONEWORKER DONE
WORKER WAIT
MAIN END
MAIN SETUP
MAIN POLLS READINESS

WORKER WAIT
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
ALL WORKERS READY
MAIN END
MAIN SETUP
MAIN POLLS READINESS

1 Ответ

0 голосов
/ 22 октября 2019

Причина, по которой «рабочий поток пробуждается без уведомления от основного потока», довольно очевидна: приращение this-> num_wait и this-> cond _-> wait находится в одном и том же блоке без снятия блокировки / уведомления об этом. основной поток, если условие пробуждения условие условия истинно. Последний поток в опросе сразу проходит условие, определенное в wakeWorker (), отсюда и ваше наблюдение.

(Надеюсь, этот код - просто игрушечный код, с которым можно поиграть - у него слишком много проблем ... онменя не удивляет, если он заходит в тупик при ручной блокировке / разблокировке мьютекса ...)

        this->num_wait++;
        std::cout << "WORKER WAIT" << std::endl;
        this->cond_->wait(lock,
                        std::bind(&TaskSystemParallelThreadPoolSleeping::wakeWorker, this));
        std::cout << "WORKER AWAKENS!" << std::endl;

В соответствии с https://en.cppreference.com/w/cpp/thread/condition_variable/wait, condition_variable :: wait () эквивалентно

while (!pred()) {
    wait(lock);
}

, поэтому, если pred () возвращает true, выполнение не снимает блокировку

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...