У меня есть очень простая и небольшая программа на C ++, которая создает пул потоков , затем помещает сообщений в очередь блокировки , разделяемую между потоками, чтобы сказать каждомупоток, что делать.
Сообщение может быть: -1 (конец потока -> завершить), -2 (барьер -> ждать, пока все потоки достигнутэто, затем продолжить), другие значения , чтобы сделать случайные вычисления.Цикл выполняется в следующем порядке: некоторые вычисления, барьер, некоторые вычисления, барьер, ..., барьер, конец потока, объединение потоков, выход.
Я не могу понять почемуЯ получаю тупик даже с двумя потоками в пуле.Очередь не может стать пустой, но порядок, в котором я нажимаю и выскакиваю сообщения, всегда приводит к пустой очереди!
Реализация блокирующей очереди - та, что предлагается здесь ( C ++ Эквивалент JavaBlockingQueue ) с добавлением всего двух методов.Я также копирую код очереди ниже.
Любая помощь?
Main.cpp
#include <iostream>
#include <vector>
#include <thread>
#include "Queue.hpp"
using namespace std;
// function executed by each thread
void f(int i, Queue<int> &q){
while(1){
// take a message from blocking queue
int j= q.pop();
// if it is end of stream then exit
if (j==-1) break;
// if it is barrier, wait for other threads to reach it
if (j==-2){
// active wait! BAD, but anyway...
while(q.size() > 0){
;
}
}
else{
// random stuff
int x = 0;
for(int i=0;i<j;i++)
x += 4;
}
}
}
int main(){
Queue<int> queue; //blocking queue
vector<thread> tids; // thread pool
int nt = 2; // number of threads
int dim = 8; // number to control number of operations
// create thread pool, passing thread id and queue
for(int i=0;i<nt;i++)
tids.push_back(thread(f,i, std::ref(queue)));
for(int dist=1; dist<=dim; dist++){ // without this outer loop the program works fine
// push random number
for(int j=0;j<dist;j++){
queue.push(4);
}
// push barrier code
for(int i=0;i<nt;i++){
queue.push(-2);
}
// active wait! BAD, but anyway...
while (queue.size()>0){
;
}
}
// push end of stream
for(int i=0;i<nt;i++)
queue.push(-1);
// join thread pool
for(int i=0;i<nt;i++){
tids[i].join();
}
return 0;
}
Queue.hpp
#include <deque>
#include <mutex>
#include <condition_variable>
template <typename T>
class Queue
{
private:
std::mutex d_mutex;
std::condition_variable d_condition;
std::deque<T> d_queue;
public:
void push(T const& value) {
{
std::unique_lock<std::mutex> lock(this->d_mutex);
d_queue.push_front(value);
}
this->d_condition.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(this->d_mutex);
this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
T rc(std::move(this->d_queue.back()));
this->d_queue.pop_back();
return rc;
}
bool empty(){
std::unique_lock<std::mutex> lock(this->d_mutex);
return this->d_queue.empty();
}
int size(){
std::unique_lock<std::mutex> lock(this->d_mutex);
return this->d_queue.size();
}
};