Как сделать задачи пула потоков в абстрактном типе функции? - PullRequest
0 голосов
/ 13 марта 2020

Я пытаюсь реализовать очень простой C ++ Thread Pool. Пока я проверил это работает. Тем не менее, я хочу сделать задачи в абстрактной форме. Я искал десятки статей, они не кажутся тем, что я хочу. (Возможно, мое ключевое слово не подходит ...) В настоящее время только функция

void (*)()

может быть принята в качестве функции задачи. Простой код написан ниже.

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <Windows.h>

class CYSThreadPool
{
private:

    static std::thread** s_ppThreads;
    static long s_lThreadCount;
    static bool s_bJoin;
    static std::mutex* s_pMutexJoin;

    static std::queue<void (*)()> s_queueTasks;
    static std::mutex* s_pMutexTasks;

    CYSThreadPool(){}
    ~CYSThreadPool(){}

    static void ThreadFunction()
    {
        while (true)
        {
            if (!s_pMutexJoin->try_lock())
                continue;

            bool bJoin = s_bJoin;
            s_pMutexJoin->unlock();

            if (bJoin)
                break;

            if (!s_pMutexTasks->try_lock())
                continue;

            void (*fp)() = nullptr;
            if (s_queueTasks.size() > 0ull)
            {
                fp = s_queueTasks.front();
                s_queueTasks.pop();
            }

            s_pMutexTasks->unlock();

            if (fp != nullptr)
                fp();
        }
    }

public:

    enum class EResult : unsigned long
    {
        Success = 0ul,
        Fail_Undefined = 1ul,
        Fail_InvalidThreadCount = 2ul,
        Fail_ArgumentNull = 3ul
    };

    static const EResult Join()
    {
        if (s_ppThreads == nullptr)
        {
            if (s_lThreadCount == 0l)
                return EResult::Success;
            else
                return EResult::Fail_Undefined;
        }
        else
        {
            if (s_lThreadCount <= 0l)
                return EResult::Fail_Undefined;
            else
            {
                s_pMutexJoin->lock();
                s_bJoin = true;
                s_pMutexJoin->unlock();

                for (long i = 0l; i < s_lThreadCount; ++i)
                {
                    s_ppThreads[i]->join();
                    s_ppThreads[i] = nullptr;
                }

                delete s_ppThreads;
                s_ppThreads = nullptr;
                s_lThreadCount = 0l;

                s_pMutexJoin->lock();
                s_bJoin = false;
                s_pMutexJoin->unlock();
            }
        }

        return EResult::Success;
    }

    static const EResult CreateThreads(const long _lThreadCount)
    {
        if (_lThreadCount < 0l)
            return EResult::Fail_InvalidThreadCount;

        if (Join() != EResult::Success)
            return EResult::Fail_Undefined;

        if (_lThreadCount == 0l)
            return EResult::Success;

        s_ppThreads = new std::thread*[_lThreadCount]{};

        for (long i = 0l; i < _lThreadCount; ++i)
            s_ppThreads[i] = new std::thread(ThreadFunction);

        s_lThreadCount = _lThreadCount;

        return EResult::Success;
    }

    static const EResult AddTask(void (*_fp)())
    {
        if (_fp == nullptr)
            return EResult::Fail_ArgumentNull;

        s_pMutexTasks->lock();
        s_queueTasks.push(_fp);
        s_pMutexTasks->unlock();

        return EResult::Success;
    }
};

std::thread** CYSThreadPool::s_ppThreads = nullptr;
long CYSThreadPool::s_lThreadCount = 0l;
bool CYSThreadPool::s_bJoin = false;
std::mutex* CYSThreadPool::s_pMutexJoin = new std::mutex();
std::queue<void (*)()> CYSThreadPool::s_queueTasks;
std::mutex* CYSThreadPool::s_pMutexTasks = new std::mutex();

void Test0()
{
    for (long i = 0l; i < 100000l; ++i)
    {
        std::cout << "A";
    }
}

int main()
{
    CYSThreadPool::EResult eResult = CYSThreadPool::Join();
    eResult = CYSThreadPool::CreateThreads(-1l);
    eResult = CYSThreadPool::CreateThreads(1l);
    eResult = CYSThreadPool::CreateThreads(0l);
    eResult = CYSThreadPool::CreateThreads(2l);

    CYSThreadPool::AddTask(Test0);

    Sleep(1000ul);

    CYSThreadPool::Join();

    return 0;
}

Добрый и продуманный ответ будет по-настоящему оценен!

РЕДАКТИРОВАТЬ:

Что я имел в виду под «Функцией» в абстрактной форме '- получить любые типы функций, кроме void () (). Например, void () (long), void () (ID3D12Resource1 , IDXGISwapChain) или HResult () (ID3D12Device6 , ID3D12Resource1 *, IDXGISwapChain) и т. Д. 1019 *

Спасибо за комментарий об использовании condition_variable вместо try_lock и оборачивании fp () в try-catch. Извините, но я не думаю, что понял, о чем вы. Могу ли я попросить более подробную информацию?

1 Ответ

2 голосов
/ 13 марта 2020

Итак, вот несколько вещей, которые могут улучшить ваш код:

  1. Вы можете использовать std :: function вместо void(*)(). Это позволит вам вызывать любые вызываемые, а не только указатели функций, заключая их в лямбду, например, CYSThreadPool::AddTask([]() { Add(3, 1); });.
  2. Еще лучше создать абстрактный базовый класс, скажем Task с виртуальным .Run() метод и использовать его вместо. Вы можете извлечь FunctionPointerTask или StdFunctionTask (вы можете изменить имена) из него, если хотите. Вы можете создать, скажем, ProcessResource1Task и передать необходимые аргументы в конструктор, чтобы .Run() только выполнял его.
  3. Ваши ThreadFunction работники очень неэффективны. Вы все время крутитесь на try_lock. Не. Вместо этого используйте condition_variables .
  4. Еще лучше, переместите объект очереди в отдельный потокобезопасный класс очереди.
  5. Ваш код не безопасен для исключений. Не используйте ручную блокировку / try_lock, вместо этого используйте lock_guard или unique_lock . Как это ваш код может легко тупик. Если вы должны использовать ручную блокировку / try_lock, тогда вы должны использовать try-catch.
  6. Множество переменных * stati c. Не. Сделайте их всех нестатичными c членами, добавьте соответствующие конструкторы и деструктор. Ваш пул потоков после присоединения полностью непригоден. Plus Join () не является потокобезопасным (было бы менее важно, если бы пул потоков не был полностью c).
  7. Вы можете улучшить производительность, установив bool s_bJoin atomi c . Нет необходимости читать / записывать его под замком.

Могут быть и другие проблемы, это то, что я сейчас вижу.

Вы также можете просмотреть это код для неплохой реализации пула потоков. Он короткий и использует большинство из приведенных выше советов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...