Похоже, барьеры Boost.Thread может быть то, что вам нужно.
Вот рабочий пример, который усредняет значения, предоставленные несколькими рабочими потоками. Каждый рабочий поток использует один и тот же общий барьер (через экземпляр accumulator
) для синхронизации друг друга.
#include <cstdlib>
#include <iostream>
#include <vector>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
boost::mutex coutMutex;
typedef boost::lock_guard<boost::mutex> LockType;
class Accumulator
{
public:
Accumulator(int count) : barrier_(count), sum_(0), count_(count) {}
void accumulateAndWait(float value)
{
{
// Increment value
LockType lock(mutex_);
sum_ += value;
}
barrier_.wait(); // Wait for other the threads to wait on barrier.
}
void wait() {barrier_.wait();} // Wait on barrier without changing sum.
float sum() {LockType lock(mutex_); return sum_;} // Return current sum
float average() {LockType lock(mutex_); return sum_ / count_;}
// Reset the sum. The barrier is automatically reset when triggered.
void reset() {LockType lock(mutex_); sum_ = 0;}
private:
typedef boost::lock_guard<boost::mutex> LockType;
boost::barrier barrier_;
boost::mutex mutex_;
float sum_;
int count_;
};
/* Posts a value for the accumulator to add and waits for other threads
to do the same. */
void workerFunction(Accumulator& accumulator)
{
// Sleep for a random amount of time before posting value
int randomMilliseconds = std::rand() % 3000;
boost::posix_time::time_duration randomDelay =
boost::posix_time::milliseconds(randomMilliseconds);
boost::this_thread::sleep(randomDelay);
// Post some random value
float value = std::rand() % 100;
{
LockType lock(coutMutex);
std::cout << "Thread " << boost::this_thread::get_id() << " posting "
<< value << " after " << randomMilliseconds << "ms\n";
}
accumulator.accumulateAndWait(value);
float avg = accumulator.average();
// Print a message to indicate this thread is past the barrier.
{
LockType lock(coutMutex);
std::cout << "Thread " << boost::this_thread::get_id() << " unblocked. "
<< "Average = " << avg << "\n" << std::flush;
}
}
int main()
{
int workerThreadCount = 5;
Accumulator accumulator(workerThreadCount);
// Create and launch worker threads
boost::thread_group threadGroup;
for (int i=0; i<workerThreadCount; ++i)
{
threadGroup.create_thread(
boost::bind(&workerFunction, boost::ref(accumulator)));
}
// Wait for all worker threads to finish
threadGroup.join_all();
{
LockType lock(coutMutex);
std::cout << "All worker threads finished\n" << std::flush;
}
/* Pause a bit before exiting, to give worker threads a chance to
print their messages. */
boost::this_thread::sleep(boost::posix_time::seconds(1));
}
Я получаю следующий вывод:
Thread 0x100100f80 posting 72 after 1073ms
Thread 0x100100d30 posting 44 after 1249ms
Thread 0x1001011d0 posting 78 after 1658ms
Thread 0x100100ae0 posting 23 after 1807ms
Thread 0x100101420 posting 9 after 1930ms
Thread 0x100101420 unblocked. Average = 45.2
Thread 0x100100f80 unblocked. Average = 45.2
Thread 0x100100d30 unblocked. Average = 45.2
Thread 0x1001011d0 unblocked. Average = 45.2
Thread 0x100100ae0 unblocked. Average = 45.2
All worker threads finished