Я использую потоки повышения, чтобы распараллелить вычисления в моей программе.Контроллер управляет расчетными заданиями и результатами.Я создаю группу рабочих потоков, которые получают свои задания от объекта контроллера, в то время как основной поток отображает результаты.Результаты должны быть показаны в правильном порядке.Для достижения этого я использую фьючерсы на повышение в std::deque
.GetNewJob()
добавляет новый boost::promise
в конец очереди и возвращает указатель.GetNextResult()
получает результат из переднего конца очереди.Если результат еще не готов, он блокирует вызывающий поток.
Важные части моего класса Controller:
class Controller
{
public:
Controller();
boost::shared_ptr<boost::promise<Results> > GetNewJob();
Results GetNextResult();
class NoJobsLeft{};
class NoResultsLeft{};
private:
bool JobsLeft() const;
bool ResultsLeft() const;
std::deque<boost::shared_ptr<boost::promise<Results> > > queue_;
boost::mutex mutex_;
boost::condition_variable condition_;
};
Рабочая функция:
void DoWork()
{
try
{
while(true)
{
boost::shared_ptr<boost::promise<Results> >
promise(controller.GetNewJob());
//do calculations
promise->set_value(results);
}
}
catch(NoJobsLeft)
{
}
}
Основная программакод:
Controller controller(args);
boost::thread_group worker_threads;
for (unsigned i = 0; i < n_cpus; ++i)
worker_threads.create_thread(DoWork);
try
{
while(true)
{
Results results = controller.GetNextResult();
std::cout << results;
std::cout << std::endl;
}
}
catch(NoResultsLeft)
{
}
worker_threads.join_all();
Иногда это работает просто отлично, отображаются все результаты. Но очень часто я вообще ничего не вижу.
Я не использую cout
в рабочих потоках.
Реализации GetNewJob()
,GetNextResult()
:
boost::shared_ptr<boost::promise<Results> > Controller::GetNewJob()
{
boost::lock_guard<boost::mutex> lock(mutex_);
if (!JobsLeft())
throw NoJobsLeft();
//determine more information about the job, not important here
queue_.push_back(boost::make_shared<boost::promise<Results> >());
condition_.notify_one();
return queue_.back();
}
Results Controller::GetNextResult()
{
boost::shared_ptr<boost::promise<Results> > results;
{
boost::unique_lock<boost::mutex> lock(mutex_);
if (!ResultsLeft())
throw NoResultsLeft();
while(!queue_.size())
{
condition_.wait(lock);
}
results = queue_.front();
queue_.pop_front();
}
return results->get_future().get();
}
bool Controller::ResultsLeft() const
{
return (queue_.size() || JobsLeft()) ? true : false;
}