Как распараллелить для l oop в C ++ создание пула потоков только один раз - PullRequest
0 голосов
/ 25 марта 2020

У меня есть программа на C ++, которая должна запускаться в Windows.

У меня есть игра, в которой основной l oop есть в функции WinMain, которая вызывает функции Update в каждой итерации , Я хочу применить многопоточность ко всем oop функциям обновления.

int __stdcall WinMain()
{
    // Windows initializations

    // Main loop
    {
        Game game = new Game();
        bool bExit = false;
        while (!bExit)
        {
            MSG msg;
            while (PeekMessage(&msg, NULL, 0, 0, PM_REMOVE))
            {
                if (msg.message == WM_QUIT) bExit = true;
                TranslateMessage(&msg);
                DispatchMessage(&msg);
            }

            game->Update();
        }
        delete game;
        game = nullptr;
    }

    // Windows destructions

    return 0;
}
// Game.cpp

void Game::Update() {
    // Loop that I want to be parallelized but I don't want to create the thread pool here because it's called on every frame
    for (size_t i = 0; i < entities.size(); i++) {
        entities[i]->Update();
    }
}

Я пытался использовать OpenMP, но не мог поместить #pragma omp parallel в функцию WinMain (приложение будет cra sh) и если я помещу #pragma omp parallel for внутри Game::Update непосредственно перед l oop, это фактически снижает производительность, потому что создает поток l oop в каждом кадре.

I'm ищу основанное на библиотеке или, предпочтительно, собственное решение, которое позволяет мне легко распараллелить это l oop.

Единственное требование - это то, что оно работает в Windows, я бы предпочел не использовать библиотеку (например, буст, хотя OpenMP в порядке).

РЕДАКТИРОВАТЬ: Я получил его работать с PPL concurrency::parallel_for. Это не ухудшает производительность, но и не увеличивает ее ... Мой вопрос относительно этого: создает ли эта функция и уничтожает ли ее пул потоков при каждом вызове?

Ответы [ 2 ]

1 голос
/ 25 марта 2020

Здесь решение с базисной c библиотекой потоков. Я смоделировал ваши Entity и Game классы.
Обратите внимание, что в этом решении рабочий поток создается и запускается ОДИН раз в начале. Они будут вызываться при каждом Update звонке. Когда Update не вызывается, потоки спят ...

Я сделал все возможное, чтобы сохранить архитектуру вашей программы. Обратите внимание, что мы можем использовать другую реализацию с std :: thread, std :: mutex, ... но я просто хотел дать вам идею ...

#define NB_ENTITIES 10

class CEntity
{
public:
    void Update(){};
    ~CEntity () {}
};

typedef struct ThreadData
{
    HANDLE  hMutex;
    HANDLE  hMutexDestructor;
    CEntity *pCEntity;  
    DWORD   *dwStat;

} ThreadData;

//------------------------------------
// This function will call "Upadate"
//------------------------------------
DWORD WINAPI MyThreadFunction( LPVOID lpParam )
{
    ThreadData *ThreadDatpa      = (ThreadData*) lpParam;
    CEntity    *pEntity          = ThreadDatpa->pCEntity;
    HANDLE     hMutex            = ThreadDatpa->hMutex;
    HANDLE     hMutexDestructor  = ThreadDatpa->hMutexDestructor;
    DWORD      *dwStat           = ThreadDatpa->dwStat;

    while (true)
    {
        // When no update, thread sleep ... 0% CPU ...
        WaitForSingleObject(hMutex, INFINITE);
        if ( 0 == *dwStat ) break; // here thread stat for stopping 

        if ( nullptr != pEntity ) 
            pEntity->Update(); // Call your unpdate function ...
    }

    // Each worker thread must release it semaphore.
    // Destructor must get ALL released semaphore before deleting memory
    ReleaseSemaphore(hMutexDestructor, 1, NULL );

    return 0;
}

class Game 
{
public :
    vector<ThreadData*>  entities; // Vector of entities pointers
    vector<HANDLE>   thread_group; // vector of threads handle


    //This function must called ONE time at the beginning (at init)
    void StartThreads ()
    {
        DWORD  dwRet = 0;
        HANDLE hTemp = NULL;

        for (size_t i = 0; i <NB_ENTITIES; i++) 
        {
            CEntity *pCEntity = new CEntity (); // just to simulate entity

            // This semaphore is used to release thread when update is called
            HANDLE ghMutex= CreateSemaphore( NULL,0, 1, NULL); 

            // This semaphore is used when destruction to check if all threads is terminated
            HANDLE ghMutexDestructor= CreateSemaphore( NULL,0, 1, NULL);

            // create a new CEntity data ...
            ThreadData *pThreadData = new ThreadData ();

            pThreadData->pCEntity = pCEntity;
            pThreadData->hMutex   = ghMutex;
            pThreadData->hMutexDestructor = ghMutexDestructor;
            pThreadData->dwStat   = new DWORD (1); // default status = 1 

            entities.push_back ( pThreadData );         
        }

        // Here we start ONE time Threads worker.
        // Threads are stopped untile update was called
        for (size_t i = 0; i < entities.size(); i++) 
        {
            // Each thread has it own entity
            hTemp = CreateThread( NULL,0, MyThreadFunction, entities.at(i), 0, &dwRet);  
            if ( NULL != hTemp )
                thread_group.push_back (hTemp);
        }

    }

    // Your function update juste wakeup threads
    void Game::Update() {
        for (size_t i = 0; i < entities.size(); i++) 
        {
            HANDLE hMutex = entities.at(i)->hMutex;
            if ( NULL != hMutex )
                ReleaseSemaphore(hMutex, 1, NULL );
        }
    }


    ~Game()
    {
        // Modifie stat before releasing threads
        for (size_t i = 0; i < entities.size(); i++) 
            *(entities.at(i)->dwStat) = 0;

        // Release threads (status =0 so break ...)
        Update();

        // This can be replaced by waitformultipleobjects ...
        for (size_t i = 0; i < entities.size(); i++)
            WaitForSingleObject ( entities.at(i)->hMutexDestructor, INFINITE);

        // Now i'm sur that all threads are terminated
        for (size_t i = 0; i < entities.size(); i++)
        {
            delete entities.at(i)->pCEntity;
            delete entities.at(i)->dwStat;
            CloseHandle (entities.at(i)->hMutex);
            CloseHandle (entities.at(i)->hMutexDestructor);
            delete entities.at(i);
        }

    }
};
1 голос
/ 25 марта 2020

Предоставление примера Boost.Asio, основанного на нашей дискуссии в комментариях:

#include <thread>

#include <boost/asio.hpp>

namespace asio = boost::asio;

int main( int argc, int* argv[] ) {
    asio::io_context context{};

    asio::post( context, []() { /* any arbitrary job */ } );

    // this donates the new_thread to the thread pool
    std::thread new_thread{ [&](){ context.run(); } };

    // this donates the current thread to the thread pool
    context.run();
}

Здесь необходимо отметить следующие основные моменты:

  1. asio::post позволяет вам отправить произвольные задания для io_context
  2. , вызывающие io_context::run из потока, передают этот поток в пул потоков, в котором выполняется контекст

Если вам нужен предварительно созданный пул потоков, Вы можете использовать boost::asio::thread_pool и вызывать asio::post, чтобы таким же образом размещать задания в пуле потоков.

Вам нужно только загрузить Boost, вам не нужно его устанавливать. Если вы добавите BOOST_ERROR_CODE_HEADER_ONLY к вашей компиляции, Boost.Asio будет полностью работать только с заголовками.

EDIT:

Вам все еще нужно запустить скрипт установки Boost, но вам не нужно собирать любая из библиотек.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...