Выделено больше потоков, чем хотелось - PullRequest
0 голосов
/ 10 июля 2020

Я реализовал пул потоков на C ++, в котором я создаю Nthread воркеров, которым я назначаю некоторые задания из очереди, в которую продолжаю продвигать задачу. Когда очередь пуста и / или когда я так говорю, потоки перестают работать. Все работает на WSL (Ubuntu 20.04 Focal).

Если я открываю Windows диспетчер задач при запуске программы, количество реально работающих потоков больше, чем выделенное. Например, если я запускаю программу с 4 потоками на машине с 12 ядрами, я вижу как минимум 6 ядер с активностью выше среднего; если я использую 10 потоков, все 12 ядер go до 100%. Ожидается ли такое поведение или я что-то делаю не так? Я ожидал увидеть на один поток больше, чем выделенные, потому что я порождаю потоки Nthread из основного (который, кстати, должен молчать, ожидая, пока остальные закончат sh ...), но я не могу объяснить, что Понятно.

Я хочу подчеркнуть, что я создаю все потоки Nthread перед всеми операциями, затем заполняю и обрабатываю очередь и, наконец, уничтожаю потоки, то есть, насколько я могу видеть, я не постоянно создавать / уничтожать потоки во время вычислений.

EDIT

Я забыл упомянуть, что работаю под C ++ 11. Вот соответствующий код C ++.

В основном. cc

ThreadPool *pool = new ThreadPool(fNThreads);
std::vector<std::function<void(int)>> *caller = 
    new std::vector<std::function<void(int)>>;

for (size_t iter = 0; iter < nIter; ++iter)
{
    pool->ResetQueue();
    for (size_t j = 0; nmax < 2; ++j)
    {
        caller->push_back( 
            [=](int iThr){function(iter, j, iThr);});
        pool->PushTask((*caller)[j]);
    }
    pool->WaitForCompletion(1.e-4);
    caller->clear();
}

delete caller;
delete pool; 

SynchronizedQueue.hh

#ifndef SYNCQUEUE_H
#define SYNCQUEUE_H

#include <list>
#include <mutex>
#include <condition_variable>

template<typename T>
class SynchronizedQueue
{
    public:
        SynchronizedQueue();
        ~SynchronizedQueue();
        
        void Put(T const & data);
        void Put(T const && data);
        T Get();
        size_t Size();
        
        SynchronizedQueue(SynchronizedQueue const &) = delete;
        SynchronizedQueue & operator=(SynchronizedQueue const &) = delete;
        SynchronizedQueue(SynchronizedQueue&&) = delete;
        SynchronizedQueue & operator=(SynchronizedQueue&&) = delete;
        
    private:
        std::list<T> queue;
        std::mutex mut;
        std::condition_variable condvar;
};

template<typename T>
SynchronizedQueue<T>::SynchronizedQueue()
{}

template<typename T>
SynchronizedQueue<T>::~SynchronizedQueue()
{}

template<typename T>
void SynchronizedQueue<T>::Put(T const & data)
{
    std::unique_lock<std::mutex> lck(mut);
    queue.push_back(data);
    condvar.notify_one();
}

template<typename T>
T SynchronizedQueue<T>::Get()
{
    std::unique_lock<std::mutex> lck(mut);
    while (queue.empty())
    {
        condvar.wait(lck);
    }
    T result = queue.front();
    queue.pop_front();
    return result;
}

template<typename T>
size_t SynchronizedQueue<T>::Size()
{
    std::unique_lock<std::mutex> lck(mut);
    return queue.size();
}

#endif

ThreadPool.hh

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include "SynchronizedQueue.hh"

#include <atomic>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>

class ThreadPool
{
    public:
        ThreadPool(unsigned int nThreads = 1);
        virtual ~ThreadPool();
        
        void PushTask(std::function<void(int)> func);
        void WaitForCompletion();
        void WaitForCompletion(int sec);
        void ResetQueue();
        void JoinThreads();
        void Delay(int sec);
        size_t GetWorkQueueLength();
        
    private:
        void WorkerThread(int i);
        
        std::atomic<bool> done;
        unsigned int threadCount;
        SynchronizedQueue<std::function<void(int)>> workQueue;
        std::vector<std::thread> threads;
};


#endif

пул потоков cc

#include "ThreadPool.hh"
#include "SynchronizedQueue.hh"

#include <chrono>
//#include <iostream>

void doNothing(int i)
{}

ThreadPool::ThreadPool(unsigned int nThreads)
    : done(false)
{
    if (nThreads <= 0)
    {
        threadCount = std::thread::hardware_concurrency();
    } 
    else
    {
        threadCount = nThreads;
    }
    
    for (unsigned int i = 0; i < threadCount; ++i)
    {
        threads.push_back(std::thread(&ThreadPool::WorkerThread, this, i));
    }
}

ThreadPool::~ThreadPool()
{
    WaitForCompletion();
    JoinThreads();
}

void ThreadPool::WaitForCompletion(int sec)
{
    if (!done)
    {
        while (GetWorkQueueLength()) 
        {
            std::this_thread::sleep_for(std::chrono::seconds(sec));
        }
        done = true;
        for (unsigned int i = 0; i < threadCount; ++i)
        {
            PushTask(&doNothing);
        } 
    }
}

void ThreadPool::WaitForCompletion()
{
    if (!done)
    {
        while (GetWorkQueueLength()) 
        {}
        done = true;
        for (unsigned int i = 0; i < threadCount; ++i)
        {
            PushTask(&doNothing);
        } 
    }
}

void ThreadPool::JoinThreads()
{
    for (auto& th : threads)
    {
        if (th.joinable())
        {
            th.join();
        }
    }
}


void ThreadPool::Delay(int sec)
{
    std::this_thread::sleep_for(std::chrono::seconds(sec));
}

void ThreadPool::PushTask(std::function<void(int)> func)
{
    workQueue.Put(func);
}

void ThreadPool::ResetQueue()
{
    done = false;
}

void ThreadPool::WorkerThread(int i)
{
    while (!done)
    {
        workQueue.Get()(i);
    }
}

size_t ThreadPool::GetWorkQueueLength()
{
    return workQueue.Size();
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...