Повысить синхронизацию - PullRequest
       6

Повысить синхронизацию

3 голосов
/ 26 января 2012

У меня есть темы NUM_THREADS со следующими кодами в моей теме:

/*
Calculate some_value;
*/

//Critical section to accummulate all thresholds
{
    boost::mutex::scoped_lock lock(write_mutex);
    T += some_value;
    num_threads++;
    if (num_threads == NUM_THREADS){
        T = T/NUM_THREADS;
        READY = true;
        cond.notify_all();
        num_threads = 0;
    }
}

//Wait for average threshold to be ready
if (!READY)
{
    boost::unique_lock<boost::mutex> lock(wait_mutex);
    while (!READY){
        cond.wait(lock);
    }
}
//End critical section

/*
do_something;
*/

По сути, я хочу, чтобы все потоки ждали сигнала READY, прежде чем продолжить. num_thread имеет значение 0, а READY имеет значение false перед созданием потоков. Время от времени возникает тупик. Может кто-нибудь помочь, пожалуйста? Все переменные наддува объявляются следующим образом:

boost::mutex write_mutex;
boost::mutex wait_mutex;
boost::condition cond;

Ответы [ 2 ]

1 голос
/ 29 января 2012

Код имеет условие гонки на флаге READY (который я предполагаю, что это просто переменная bool). Что может произойти (то есть один из возможных вариантов чередования выполнения потока):

Thread T1:                                 Thread T2:
if (!READY)                                
{
    unique_lock<mutex> lock(wait_mutex);   mutex::scoped_lock lock(write_mutex);
    while (!READY)                         /* ... */
    {                                      READY = true;
        /* !!! */                          cond.notify_all();
        cond.wait(lock);
    }
}

Код, проверяющий флаг READY, не синхронизируется с кодом, его устанавливающим (обратите внимание, что блокировки для этих критических разделов различны). И когда T1 находится в «дыре» между проверкой флага и ожиданием на cond, T2 может установить флаг и отправить сигнал на cond, который T1 может пропустить.

Самое простое решение - заблокировать правильный мьютекс для обновления READY и уведомления о состоянии:

/*...*/
T = T/NUM_THREADS;
{
    boost::mutex::scoped_lock lock(wait_mutex);
    READY = true;
    cond.notify_all();
}
1 голос
/ 26 января 2012

Похоже, барьеры Boost.Thread может быть то, что вам нужно.

Вот рабочий пример, который усредняет значения, предоставленные несколькими рабочими потоками. Каждый рабочий поток использует один и тот же общий барьер (через экземпляр accumulator) для синхронизации друг друга.

#include <cstdlib>
#include <iostream>
#include <vector>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>

boost::mutex coutMutex;
typedef boost::lock_guard<boost::mutex> LockType;

class Accumulator
{
public:
    Accumulator(int count) : barrier_(count), sum_(0), count_(count) {}

    void accumulateAndWait(float value)
    {
        {
            // Increment value
            LockType lock(mutex_);
            sum_ += value;
        }
        barrier_.wait(); // Wait for other the threads to wait on barrier.
    }

    void wait() {barrier_.wait();} // Wait on barrier without changing sum.

    float sum() {LockType lock(mutex_); return sum_;} // Return current sum

    float average() {LockType lock(mutex_); return sum_ / count_;}

    // Reset the sum. The barrier is automatically reset when triggered.
    void reset() {LockType lock(mutex_); sum_ = 0;}

private:
    typedef boost::lock_guard<boost::mutex> LockType;
    boost::barrier barrier_;
    boost::mutex mutex_;
    float sum_;
    int count_;
};

/*  Posts a value for the accumulator to add and waits for other threads
    to do the same. */
void workerFunction(Accumulator& accumulator)
{
    // Sleep for a random amount of time before posting value
    int randomMilliseconds = std::rand() % 3000;
    boost::posix_time::time_duration randomDelay =
            boost::posix_time::milliseconds(randomMilliseconds);
    boost::this_thread::sleep(randomDelay);

    // Post some random value
    float value = std::rand() % 100;

    {
        LockType lock(coutMutex);
        std::cout << "Thread " << boost::this_thread::get_id() << " posting "
                  << value << " after " << randomMilliseconds << "ms\n";
    }
    accumulator.accumulateAndWait(value);

    float avg = accumulator.average();

    // Print a message to indicate this thread is past the barrier.
    {
        LockType lock(coutMutex);
        std::cout << "Thread " << boost::this_thread::get_id() << " unblocked. "
                  << "Average = " << avg << "\n" << std::flush;
    }
}

int main()
{
    int workerThreadCount = 5;
    Accumulator accumulator(workerThreadCount);

    // Create and launch worker threads
    boost::thread_group threadGroup;
    for (int i=0; i<workerThreadCount; ++i)
    {
        threadGroup.create_thread(
                boost::bind(&workerFunction, boost::ref(accumulator)));
    }

    // Wait for all worker threads to finish
    threadGroup.join_all();
    {
        LockType lock(coutMutex);
        std::cout << "All worker threads finished\n" << std::flush;
    }

    /* Pause a bit before exiting, to give worker threads a chance to
       print their messages. */
    boost::this_thread::sleep(boost::posix_time::seconds(1));
}

Я получаю следующий вывод:

Thread 0x100100f80 posting 72 after 1073ms
Thread 0x100100d30 posting 44 after 1249ms
Thread 0x1001011d0 posting 78 after 1658ms
Thread 0x100100ae0 posting 23 after 1807ms
Thread 0x100101420 posting 9 after 1930ms
Thread 0x100101420 unblocked. Average = 45.2
Thread 0x100100f80 unblocked. Average = 45.2
Thread 0x100100d30 unblocked. Average = 45.2
Thread 0x1001011d0 unblocked. Average = 45.2
Thread 0x100100ae0 unblocked. Average = 45.2
All worker threads finished
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...