Синхронизация потоков с boost :: condition_variable - PullRequest
1 голос
/ 22 сентября 2010

Я провожу несколько экспериментов с многопоточностью в C ++ и не знаю, как решить одну проблему.Допустим, у нас есть пул потоков, который обрабатывает запросы пользователей, используя существующий поток, и создает новый поток, когда нет свободных потоков.Я создал потокобезопасный класс command_queue, в котором есть методы push и pop.pop ожидает, пока очередь пуста, и возвращает, только когда команда доступна или истекло время ожидания.Теперь пришло время реализовать пул потоков.Идея состоит в том, чтобы свободные потоки спали в течение некоторого времени и убивали поток, если после этого периода времени нечего делать.Вот реализация

command_queue::handler_t handler;
while (handler = tasks.pop(timeout))
{
    handler();
}

, здесь мы завершаем процедуру потока, если истекло время ожидания.Это нормально, но есть проблема с созданием нового потока.Допустим, у нас уже есть 2 потока, обрабатывающих пользовательские запросы, они работают в данный момент, но нам нужно выполнить некоторую другую операцию асинхронно.Мы вызываем

thread_pool::start(some_operation);

, который должен начинать новый поток, потому что свободных потоков нет.Когда поток доступен, он вызывает timed_wait для условной переменной, поэтому идея состоит в том, чтобы проверить, есть ли ожидающие потоки.

if (thread_are_free_threads) // ???
   condition.notify_one();
else
   create_thread(thread_proc);

, но как это проверить?Документация говорит, что если нет ожидающих потоков, notify_one ничего не делает.Если бы я мог проверить, действительно ли он ничего не сделал, это было бы решением

if (!condition.notify_one()) // nobody was notified
   create_thread(thread_proc);

Насколько я вижу, это невозможно проверить.

Спасибо за ваши ответы.

Ответы [ 5 ]

2 голосов
/ 22 сентября 2010

Вам нужно создать другую переменную (возможно, семафор), которая знает, сколько потоков запущено, затем вы можете проверить это и создать новый поток, если необходимо, перед вызовом notify.

Другая, лучшеОпция заключается в том, чтобы ваши потоки не выходили, когда они истекают.Они должны остаться в живых в ожидании уведомления.Вместо того, чтобы выйти, когда время ожидания уведомления истекло, проверьте переменную, чтобы увидеть, работает ли программа по-прежнему или она «закрывается», если она все еще работает, затем снова начните ждать.

1 голос
/ 22 сентября 2010

Более типичный пул потоков будет выглядеть так:

Pool::Pool()
{
    runningThreads = 0;
    actualThreads  = 0;
    finished       = false;
    jobQue.Init();

    mutex.Init();
    conditionVariable.Init();

    for(int loop=0; loop < threadCount; ++loop) { startThread(threadroutine); }
}

Pool::threadroutine()
{

    {
        // Extra code to count threads sp we can add more if required.
        RAIILocker doLock(mutex);
        ++ actualThreads;
        ++ runningThreads;
    }
    while(!finished)
    {
         Job job;
         {
             RAIILocker doLock(mutex);

             while(jobQue.empty())
             {
                 // This is the key.
                 // Here the thread is suspended (using zero resources)
                 // until some other thread calls the notify_one on the
                 // conditionVariable. At this point exactly one thread is release
                 // and it will start executing as soon as it re-acquires the lock
                 // on the mutex.
                 //
                 -- runningThreads;
                 conditionVariable.wait(mutex);
                 ++ runningThreads;
             }
             job = jobQue.getJobAndRemoveFromQue();
         }
         job.execute();
    }
    {
        // Extra code to count threads sp we can add more if required.
        RAIILocker doLock(mutex);
        -- actualThreads;
        -- runningThreads;
    }
}

Pool::AddJob(Job job)
{
    RAIILocker doLock(mutex);

    // This is where you would check to see if you need more threads.
    if (runningThreads == actualThreads) // Plus some other conditions.
    {
        // increment both counts. When it waits we decrease the running count.
        startThread(threadroutine);
    }
    jobQue.push_back(job);
    conditionVariable.notify_one();  // This releases one worker thread
                                     // from the call to wait() above.
                                     // Note: The worker thread will not start
                                     //       until this thread releases the mutex.
}
0 голосов
/ 22 сентября 2010

Еще одна идея.Вы можете запросить длину очереди сообщений.Если это слишком долго, создайте нового работника.

0 голосов
/ 22 сентября 2010

Теперь я нашел одно решение, но оно не настолько идеально. У меня переменная-член volatile с именем free - в ней хранится количество свободных потоков в пуле.

void thread_pool::thread_function()
{
    free++;
    command_queue::handler_t handler;
    while (handler = tasks.pop(timeout))
    {
        free--;
        handler();
        free++;
    }
    free--;
}

когда я назначаю задачу потоку, я делаю что-то вроде этого

if (free == 0)
    threads.create_thread(boost::bind(&thread_pool::thread_function, this));

все еще есть проблема с синхронизацией, потому что если контекст будет переключен после освобождения - в thread_function мы могли бы создать новый поток, который нам на самом деле не нужен, но поскольку очередь задач является поточно-безопасной никаких проблем с этим, это просто нежелательные накладные расходы. Можете ли вы предложить решение этого и что вы думаете об этом? Может быть, лучше оставить все как есть, тогда еще одна синхронизация здесь?

0 голосов
/ 22 сентября 2010

Я думаю, вам нужно переосмыслить свой дизайн.В простой модели потока дилера, раздающего работу потокам игрока, дилер помещает работу в очередь сообщений и позволяет одному из игроков забрать работу, когда у него появляется шанс.

В вашем случаеДилер активно управляет пулом потоков, поскольку он знает, какие потоки игрока простаивают, а какие заняты.Так как дилер знает, какой игрок простаивает, он может активно пропустить простое задание и подать сигнал игроку, используя простой семафор (или cond var) - для каждого игрока существует один семафор.В таком случае для дилера может иметь смысл активно уничтожать незанятые нити, предоставив нити работу kill себя .

...