Многопоточность очереди событий / задач C ++ - PullRequest
8 голосов
/ 29 мая 2009

Я хотел бы создать класс, методы которого можно вызывать из нескольких потоков. но вместо выполнения метода в потоке, из которого он был вызван, он должен выполнить их все в своем собственном потоке. Нет необходимости возвращать результат и он не должен блокировать вызывающий поток.

Первая попытка реализации я включил ниже. Открытые методы вставляют указатель функции и данные в очередь заданий, которую затем получает рабочий поток. Однако это не очень хороший код, и добавление новых методов громоздко.

В идеале я хотел бы использовать это в качестве базового класса, в который я могу легко добавлять методы (с переменным числом аргументов) с минимальным вмешательством и дублированием кода.

Какой лучший способ сделать это? Есть ли существующий код, который делает что-то подобное? Спасибо

#include <queue>

using namespace std;

class GThreadObject
{
    class event
    {
        public:
        void (GThreadObject::*funcPtr)(void *);
        void * data;
    };

public:
    void functionOne(char * argOne, int argTwo);

private:
    void workerThread();
    queue<GThreadObject::event*> jobQueue;
    void functionOneProxy(void * buffer);
    void functionOneInternal(char * argOne, int argTwo);

};



#include <iostream>
#include "GThreadObject.h"

using namespace std;

/* On a continuous loop, reading tasks from queue
 * When a new event is received it executes the attached function pointer
 * It should block on a condition, but Thread code removed to decrease clutter
 */
void GThreadObject::workerThread()
{
    //New Event added, process it
    GThreadObject::event * receivedEvent = jobQueue.front();

    //Execute the function pointer with the attached data
    (*this.*receivedEvent->funcPtr)(receivedEvent->data);
}

/*
 * This is the public interface, Can be called from child threads
 * Instead of executing the event directly it adds it to a job queue
 * Then the workerThread picks it up and executes all tasks on the same thread
 */
void GThreadObject::functionOne(char * argOne, int argTwo)
{

    //Malloc an object the size of the function arguments
    int argumentSize = sizeof(char*)+sizeof(int);
    void * myData = malloc(argumentSize);
    //Copy the data passed to this function into the buffer
    memcpy(myData, &argOne, argumentSize);

    //Create the event and push it on to the queue
    GThreadObject::event * myEvent = new event;
    myEvent->data = myData;
    myEvent->funcPtr = &GThreadObject::functionOneProxy;
    jobQueue.push(myEvent);

    //This would be send a thread condition signal, replaced with a simple call here
    this->workerThread();
}

/*
 * This handles the actual event
 */
void GThreadObject::functionOneInternal(char * argOne, int argTwo)
{
    cout << "We've made it to functionTwo char*:" << argOne << " int:" << argTwo << endl;

    //Now do the work
}

/*
 * This is the function I would like to remove if possible
 * Split the void * buffer into arguments for the internal Function
 */
void GThreadObject::functionOneProxy(void * buffer)
{
    char * cBuff = (char*)buffer;
    functionOneInternal((char*)*((unsigned int*)cBuff), (int)*(cBuff+sizeof(char*)));
};

int main()
{
    GThreadObject myObj;

    myObj.functionOne("My Message", 23);

    return 0;
}

Ответы [ 8 ]

6 голосов
/ 29 мая 2009

Библиотека Futures входит в Boost и стандартную библиотеку C ++. В ACE также есть нечто подобное, но я бы не хотел рекомендовать это кому-либо (как уже заметил @lothar, это Active Object.)

2 голосов
/ 29 мая 2009

Вы можете решить эту проблему с помощью Boost's Thread -library. Примерно так (наполовину псевдо):


class GThreadObject
{
        ...

        public:
                GThreadObject()
                : _done(false)
                , _newJob(false)
                , _thread(boost::bind(&GThreadObject::workerThread, this))
                {
                }

                ~GThreadObject()
                {
                        _done = true;

                        _thread.join();
                }

                void functionOne(char *argOne, int argTwo)
                {
                        ...

                        _jobQueue.push(myEvent);

                        {
                                boost::lock_guard l(_mutex);

                                _newJob = true;
                        }

                        _cond.notify_one();
                }

        private:
                void workerThread()
                {
                        while (!_done) {
                                boost::unique_lock l(_mutex);

                                while (!_newJob) {
                                        cond.wait(l);
                                }

                                Event *receivedEvent = _jobQueue.front();

                                ...
                        }
                }

        private:
                volatile bool             _done;
                volatile bool             _newJob;
                boost::thread             _thread;
                boost::mutex              _mutex;
                boost::condition_variable _cond;
                std::queue<Event*>        _jobQueue;
};

Кроме того, обратите внимание, как RAII позволяет нам уменьшать этот код и лучше управлять им.

2 голосов
/ 29 мая 2009

Библиотека POCO имеет нечто похожее на ActiveMethod (наряду с некоторыми связанными функциями, например, ActiveResult) в разделе потоков. Исходный код легкодоступен и понятен.

1 голос
/ 29 мая 2009

Ниже приведена реализация, которая не требует метода "functionProxy". Несмотря на то, что добавлять новые методы проще, это все еще грязно.

Boost :: Bind и "Futures" действительно, похоже, исправят многое из этого. Думаю, я посмотрю на буст-код и посмотрю, как он работает. Спасибо всем за ваши предложения.

GThreadObject.h

#include <queue>

using namespace std;

class GThreadObject
{

    template <int size>
    class VariableSizeContainter
    {
        char data[size];
    };

    class event
    {
        public:
        void (GThreadObject::*funcPtr)(void *);
        int dataSize;
        char * data;
    };

public:
    void functionOne(char * argOne, int argTwo);
    void functionTwo(int argTwo, int arg2);


private:
    void newEvent(void (GThreadObject::*)(void*), unsigned int argStart, int argSize);
    void workerThread();
    queue<GThreadObject::event*> jobQueue;
    void functionTwoInternal(int argTwo, int arg2);
    void functionOneInternal(char * argOne, int argTwo);

};

GThreadObject.cpp

#include <iostream>
#include "GThreadObject.h"

using namespace std;

/* On a continuous loop, reading tasks from queue
 * When a new event is received it executes the attached function pointer
 * Thread code removed to decrease clutter
 */
void GThreadObject::workerThread()
{
    //New Event added, process it
    GThreadObject::event * receivedEvent = jobQueue.front();

    /* Create an object the size of the stack the function is expecting, then cast the function to accept this object as an argument.
     * This is the bit i would like to remove
     * Only supports 8 byte argument size e.g 2 int's OR pointer + int OR myObject8bytesSize
     * Subsequent data sizes would need to be added with an else if
     * */
    if (receivedEvent->dataSize == 8)
    {
        const int size = 8;

        void (GThreadObject::*newFuncPtr)(VariableSizeContainter<size>);
        newFuncPtr = (void (GThreadObject::*)(VariableSizeContainter<size>))receivedEvent->funcPtr;

        //Execute the function
        (*this.*newFuncPtr)(*((VariableSizeContainter<size>*)receivedEvent->data));
    }

    //Clean up
    free(receivedEvent->data);
    delete receivedEvent;

}

void GThreadObject::newEvent(void (GThreadObject::*funcPtr)(void*), unsigned int argStart, int argSize)
{

    //Malloc an object the size of the function arguments
    void * myData = malloc(argSize);
    //Copy the data passed to this function into the buffer
    memcpy(myData, (char*)argStart, argSize);

    //Create the event and push it on to the queue
    GThreadObject::event * myEvent = new event;
    myEvent->data = (char*)myData;
    myEvent->dataSize = argSize;
    myEvent->funcPtr = funcPtr;
    jobQueue.push(myEvent);

    //This would be send a thread condition signal, replaced with a simple call here
    this->workerThread();

}

/*
 * This is the public interface, Can be called from child threads
 * Instead of executing the event directly it adds it to a job queue
 * Then the workerThread picks it up and executes all tasks on the same thread
 */
void GThreadObject::functionOne(char * argOne, int argTwo)
{
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionOneInternal, (unsigned int)&argOne, sizeof(char*)+sizeof(int));
}

/*
 * This handles the actual event
 */
void GThreadObject::functionOneInternal(char * argOne, int argTwo)
{
    cout << "We've made it to functionOne Internal char*:" << argOne << " int:" << argTwo << endl;

    //Now do the work
}

void GThreadObject::functionTwo(int argOne, int argTwo)
{
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionTwoInternal, (unsigned int)&argOne, sizeof(int)+sizeof(int));
}

/*
 * This handles the actual event
 */
void GThreadObject::functionTwoInternal(int argOne, int argTwo)
{
    cout << "We've made it to functionTwo Internal arg1:" << argOne << " int:" << argTwo << endl;
}

main.cpp

#include <iostream>
#include "GThreadObject.h"

int main()
{

    GThreadObject myObj;

    myObj.functionOne("My Message", 23);
    myObj.functionTwo(456, 23);


    return 0;
}

Edit: просто для полноты я сделал реализацию с Boost :: bind. Основные отличия:

queue<boost::function<void ()> > jobQueue;

void GThreadObjectBoost::functionOne(char * argOne, int argTwo)
{
    jobQueue.push(boost::bind(&GThreadObjectBoost::functionOneInternal, this, argOne, argTwo));

    workerThread();
}

void GThreadObjectBoost::workerThread()
{
    boost::function<void ()> func = jobQueue.front();
    func();
}

Используя реализацию boost для 10000000 итераций functionOne (), это заняло ~ 19сек. Однако реализация без усиления заняла всего ~ 6,5 сек. Так что примерно в 3 раза медленнее. Я предполагаю, что поиск хорошей очереди без блокировки будет самой большой горловиной производительности здесь. Но это все еще довольно большая разница.

1 голос
/ 29 мая 2009

Вот класс, который я написал для аналогичной цели (я использую его для обработки событий, но вы, конечно, можете переименовать его в ActionQueue - и переименовать его методы).

Вы используете это так:

С функцией, которую вы хотите вызвать: void foo (const int x, const int y) { /*...*/ }

А: EventQueue q;

q.AddEvent (boost :: bind (foo, 10, 20));

В рабочем потоке

q.PlayOutEvents ();

Примечание. Должно быть достаточно легко добавить код для блокировки при условии, чтобы избежать использования циклов ЦП.

Код (Visual Studio 2003 с бустом 1.34.1):

#pragma once

#include <boost/thread/recursive_mutex.hpp>
#include <boost/function.hpp>
#include <boost/signals.hpp>
#include <boost/bind.hpp>
#include <boost/foreach.hpp>
#include <string>
using std::string;


// Records & plays out actions (closures) in a safe-thread manner.

class EventQueue
{
    typedef boost::function <void ()> Event;

public:

    const bool PlayOutEvents ()
    {
        // The copy is there to ensure there are no deadlocks.
        const std::vector<Event> eventsCopy = PopEvents ();

        BOOST_FOREACH (const Event& e, eventsCopy)
        {
            e ();
            Sleep (0);
        }

        return eventsCopy.size () > 0;
    }

    void AddEvent (const Event& event)
    {
        Mutex::scoped_lock lock (myMutex);

        myEvents.push_back (event);
    }

protected:

    const std::vector<Event> PopEvents ()
    {
        Mutex::scoped_lock lock (myMutex);

        const std::vector<Event> eventsCopy = myEvents;
        myEvents.clear ();

        return eventsCopy;
    }

private:

    typedef boost::recursive_mutex Mutex;
    Mutex myMutex;

    std::vector <Event> myEvents;

};

Надеюсь, это поможет. :)

Мартин Бильски

1 голос
/ 29 мая 2009

Для расширяемости и ремонтопригодности (и других возможностей) вы можете определить абстрактный класс (или интерфейс) для "задания", которое должен выполнять поток. Тогда пользователь (ы) вашего пула потоков будет реализовывать этот интерфейс и давать ссылку на объект в пул потоков. Это очень похоже на дизайн активного объекта Symbian: все подклассы AO CActive и должны реализовывать такие методы, как Run () и Cancel ().

Для простоты ваш интерфейс (абстрактный класс) может быть таким простым:

class IJob
{
    virtual Run()=0;
};

Тогда пул потоков или один поток, принимающий запросы, будет иметь что-то вроде:

class CThread
{
   <...>
public:
   void AddJob(IJob* iTask);
   <...>
};

Естественно, у вас будет несколько задач, которые могут иметь все виды дополнительных сеттеров / геттеров / атрибутов и все, что вам нужно в любой сфере жизни. Однако единственное, что нужно, - это реализовать метод Run (), который бы выполнял длительные вычисления:

class CDumbLoop : public IJob
{
public:
    CDumbJob(int iCount) : m_Count(iCount) {};
    ~CDumbJob() {};
    void Run()
    {
        // Do anything you want here
    }
private:
    int m_Count;
};
1 голос
/ 29 мая 2009

Вас может заинтересовать Активный объект один из шаблонов ACE структуры ACE .

Как Николай указал фьючерсы запланированы для стандарта C ++ в будущем (каламбур)).

0 голосов
/ 29 мая 2009

Вы должны взглянуть на библиотеку Boost ASIO. Он предназначен для асинхронной отправки событий. Он может быть связан с библиотекой Boost Thread для построения системы, которую вы описали.

Вам потребуется создать экземпляр одного объекта boost::asio::io_service и запланировать серию асинхронных событий (boost::asio::io_service::post или boost::asio::io_service::dispatch). Затем вы вызываете функцию-член run из потоков n . Объект io_service является поточно-ориентированным и гарантирует, что ваши асинхронные обработчики будут отправляться только в потоке, из которого вы вызвали io_service::run.

Объект boost::asio::strand также полезен для простой синхронизации потоков.

Я думаю, что библиотека ASIO - очень элегантное решение этой проблемы.

...