Ожидание родителя - PullRequest
       26

Ожидание родителя

4 голосов
/ 27 апреля 2011

Я реализую простой механизм пула потоков для моего сервера Ubuntu (для моей многопользовательской программы анонимного чата), и мне нужно заставить мои рабочие потоки спать до тех пор, пока не понадобится задание (в виде указателя на функцию и параметра)быть выполненным.

Моя текущая система выходит из окна.Я (рабочий поток) спрашиваю менеджера, если работа доступна, и если нет сна в течение 5 мс.Если есть, добавьте задание в рабочую очередь и выполните функцию.Жалкая трата циклов.

Что бы я хотел бы сделать, это создать простую систему, похожую на событие.Я имею в виду наличие вектора мьютексов (по одному на каждого работника) и указание дескриптора мьютекса в качестве параметра при создании.Затем в моем классе менеджера (который содержит и раздает задания) всякий раз, когда создается поток, блокируйте мьютекс.Когда необходимо выполнить задание, разблокируйте следующий мьютекс в очереди, подождите, пока оно будет заблокировано и разблокировано, и разблокируйте его.Однако мне интересно, есть ли гораздо лучшие способы для этого.


tldr; Так что мой вопрос таков.Какой самый эффективный, действенный и самый безопасный способ заставить поток ждать работу от управляющего класса?Является ли опрос методом, который я должен рассмотреть (более 1000 клиентов одновременно), является ли блокировка мьютекса приличной?Или есть другие методы?

Ответы [ 5 ]

6 голосов
/ 27 апреля 2011

Вам нужна переменная условия.
Все рабочие потоки вызывают wait (), который их приостанавливает.

Затем родительский поток помещает рабочий элемент в очередь и вызывает сигнал об условиипеременная.Это разбудит одну нить, которая спит.Он может удалить задание из очереди, выполнить задание и затем вызвать ожидание для условной переменной, чтобы вернуться в спящий режим.

Try:

#include <pthread.h>
#include <memory>
#include <list>

// Use RAII to do the lock/unlock
struct MutexLock
{
    MutexLock(pthread_mutex_t& m) : mutex(m)    { pthread_mutex_lock(&mutex); }
    ~MutexLock()                                { pthread_mutex_unlock(&mutex); }
    private:
        pthread_mutex_t&    mutex;
};

// The base class of all work we want to do.
struct Job
{
    virtual void doWork()  = 0;
};

// pthreads is a C library the call back must be a C function.
extern "C" void* threadPoolThreadStart(void*);

// The very basre minimal part of a thread pool
// It does not create the workers. You need to create the work threads
// then make them call workerStart(). I leave that as an exercise for you.
class ThreadPool
{

    public:
         ThreadPool(unsigned int threadCount=1);
        ~ThreadPool();

        void addWork(std::auto_ptr<Job> job);
    private:

        friend void* threadPoolThreadStart(void*);
        void workerStart();

        std::auto_ptr<Job>  getJob();

        bool                finished;   // Threads will re-wait while this is true.
        pthread_mutex_t     mutex;      // A lock so that we can sequence accesses.
        pthread_cond_t      cond;       // The condition variable that is used to hold worker threads.
        std::list<Job*>     workQueue;  // A queue of jobs.
        std::vector<pthread_t>threads;
};

// Create the thread pool
ThreadPool::ThreadPool(int unsigned threadCount)
    : finished(false)
    , threads(threadCount)
{
    // If we fail creating either pthread object than throw a fit.
    if (pthread_mutex_init(&mutex, NULL) != 0)
    {   throw int(1);
    }

    if (pthread_cond_init(&cond, NULL) != 0)
    {
        pthread_mutex_destroy(&mutex);
        throw int(2);
    }
    for(unsigned int loop=0; loop < threadCount;++loop)
    {
       if (pthread_create(threads[loop], NULL, threadPoolThreadStart, this) != 0)
       {
            // One thread failed: clean up
            for(unsigned int kill = loop -1; kill < loop /*unsigned will wrap*/;--kill)
            {
                pthread_kill(threads[kill], 9);
            }
            throw int(3);
       }
    }
}

// Cleanup any left overs.
// Note. This does not deal with worker threads.
//       You need to add a method to flush all worker threads
//       out of this pobject before you let the destructor destroy it.
ThreadPool::~ThreadPool()
{
    finished = true;
    for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop)
    {
        // Send enough signals to free all threads.
        pthread_cond_signal(&cond);
    }
    for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop)
    {
        // Wait for all threads to exit (they will as finished is true and
        //                               we sent enough signals to make sure
        //                               they are running).
        void*  result;
        pthread_join(*loop, &result);
    }
    // Destroy the pthread objects.
    pthread_cond_destroy(&cond);
    pthread_mutex_destroy(&mutex);

    // Delete all re-maining jobs.
    // Notice how we took ownership of the jobs.
    for(std::list<Job*>::const_iterator loop = workQueue.begin(); loop != workQueue.end();++loop)
    {
        delete *loop;
    }
}

// Add a new job to the queue
// Signal the condition variable. This will flush a waiting worker
// otherwise the job will wait for a worker to finish processing its current job.
void ThreadPool::addWork(std::auto_ptr<Job> job)
{
    MutexLock  lock(mutex);

    workQueue.push_back(job.release());
    pthread_cond_signal(&cond);
}

// Start a thread.
// Make sure no exceptions escape as that is bad.
void* threadPoolThreadStart(void* data)
{
    ThreadPool* pool = reinterpret_cast<ThreadPool*>(workerStart);
    try
    {
        pool->workerStart();
    }
    catch(...){}
    return NULL;
}

// This is the main worker loop.
void ThreadPool::workerStart()
{
    while(!finished)
    {
        std::auto_ptr<Job>    job    = getJob();
        if (job.get() != NULL)
        {
            job->doWork();
        }
    }
}

// The workers come here to get a job.
// If there are non in the queue they are suspended waiting on cond
// until a new job is added above.
std::auto_ptr<Job> ThreadPool::getJob()
{
    MutexLock  lock(mutex);

    while((workQueue.empty()) && (!finished))
    {
        pthread_cond_wait(&cond, &mutex);
        // The wait releases the mutex lock and suspends the thread (until a signal).
        // When a thread wakes up it is help until it can acquire the mutex so when we
        // get here the mutex is again locked.
        //
        // Note: You must use while() here. This is because of the situation.
        //   Two workers:  Worker A processing job A.
        //                 Worker B suspended on condition variable.
        //   Parent adds a new job and calls signal.
        //   This wakes up thread B. But it is possible for Worker A to finish its
        //   work and lock the mutex before the Worker B is released from the above call.
        //
        //   If that happens then Worker A will see that the queue is not empty
        //   and grab the work item in the queue and start processing. Worker B will
        //   then lock the mutext and proceed here. If the above is not a while then
        //   it would try and remove an item from an empty queue. With a while it sees
        //   that the queue is empty and re-suspends on the condition variable above.
    }
    std::auto_ptr<Job>  result;
    if (!finished)
    {    result.reset(workQueue.front());
         workQueue.pop_front();
    }

    return result;
}
3 голосов
/ 27 апреля 2011

Обычный способ, которым это реализовано, состоит в том, чтобы иметь очередь queue невыполненной работы, мьютекс mutex, защищающий очередь, и условие ожидания queue_not_empty. Затем каждый рабочий поток выполняет следующее (используя псевдо-API):

while (true) {
    Work * work = 0;
    mutex.lock();
    while ( queue.empty() )
       if ( !queue_not_empty.wait( &mutex, timeout ) )
           return; // timeout - exit the worker thread
    work = queue.front();
    queue.pop_front();
    mutex.unlock();
    work->perform();
}

Вызов wait( &mutex, timeout ) блокируется до тех пор, пока не будет подано условие ожидания или не истечет время ожидания вызова. Переданный mutex атомарно разблокируется внутри wait() и снова блокируется перед возвратом из вызова, чтобы обеспечить согласованное представление очереди для всех участников. timeout будет выбрано довольно большим (в секундах) и приведет к выходу потока (пул потоков начнет новые, если поступит больше работы).

Между тем, функция вставки работы пула потоков делает это:

Work * work = ...;
mutex.lock();
queue.push_back( work );
if ( worker.empty() )
    start_a_new_worker();
queue_not_empty.wake_one();
mutex.unlock();
2 голосов
/ 27 апреля 2011

Поскольку программа сетевого чата предположительно связана с вводом / выводом, а не с процессором, вам не нужны потоки. Вы можете обрабатывать все свои операции ввода-вывода в одном потоке, используя такие средства, как Boost.Asio или основной цикл GLib . Это переносимые абстракции над специфичными для платформы функциями, которые позволяют программе блокировать ожидание активности на любом (потенциально большом) наборе открытых файлов или сокетов, а затем просыпаться и быстро реагировать при возникновении активности.

2 голосов
/ 27 апреля 2011

Классическая синхронизация между производителем и потребителем с несколькими потребителями (рабочие потоки потребляют рабочие запросы).Хорошо известный метод состоит в том, чтобы иметь семафор, каждый рабочий поток делает down(), и каждый раз, когда у вас есть запрос на работу, выполняйте up().Затем выберите запрос из очереди, заблокированной мьютексом.Так как один up() будет активен только один down(), на мьютексе будет фактически минимальный конфликт.

В качестве альтернативы вы можете сделать то же самое с условной переменной, делая ожидание в каждом потоке и вызывая один, когдау тебя есть работа.Сама очередь по-прежнему заблокирована мьютексом (в любом случае condvar требует его).

Последнее, я не совсем уверен, но на самом деле я думаю, что вы действительно можете использовать канал в качестве очереди, включая все синхронизации (рабочие потоки просто пытаются"чтения (SizeOf (запрос))").Немного хакерский, но приводит к меньшему количеству переключений контекста.

1 голос
/ 27 апреля 2011

Самый простой способ сделать это - semaphores.Вот как работает семафор:

Семафор - это, по сути, переменная, которая принимает нулевые / положительные значения.Процессы могут взаимодействовать с ним двумя способами: увеличивать или уменьшать семафор.

Увеличение семафора добавляет 1 к этой магической переменной , и это все. С уменьшением счетчика все становится интереснее : если счет достигает нуля, и процесс пытается снова его уменьшить, поскольку он не может принимать отрицательные значения, он будет блокироваться до тех пор, пока переменная не поднимется .

Если несколько блоков процессов ожидают уменьшения значения семафора, для каждого блока увеличивается только один из них, при этом увеличивается число.

Это позволяет очень легко создать работника / задачусистема: ваш менеджерский процесс ставит задачи в очередь и увеличивает значение семафора для соответствия оставшимся элементам, а ваши рабочие процессы пытаются уменьшить количество и постоянно получать задачу.Когда нет доступных задач, они будут блокироваться и не потреблять процессорное время.Когда он появляется, проснется только один из бездействующих процессов.Магия Insta-Sync.

К сожалению, по крайней мере в мире Unix, семафорный API не очень дружелюбен, поскольку по какой-то причине он работает с массивами семафоров, а не с отдельными.Но вы - простая оболочка от приятного интерфейса!

Ура!

...