Я реализовал пул потоков на C ++, в котором я создаю Nthread
воркеров, которым я назначаю некоторые задания из очереди, в которую продолжаю продвигать задачу. Когда очередь пуста и / или когда я так говорю, потоки перестают работать. Все работает на WSL (Ubuntu 20.04 Focal).
Если я открываю Windows диспетчер задач при запуске программы, количество реально работающих потоков больше, чем выделенное. Например, если я запускаю программу с 4 потоками на машине с 12 ядрами, я вижу как минимум 6 ядер с активностью выше среднего; если я использую 10 потоков, все 12 ядер go до 100%. Ожидается ли такое поведение или я что-то делаю не так? Я ожидал увидеть на один поток больше, чем выделенные, потому что я порождаю потоки Nthread
из основного (который, кстати, должен молчать, ожидая, пока остальные закончат sh ...), но я не могу объяснить, что Понятно.
Я хочу подчеркнуть, что я создаю все потоки Nthread
перед всеми операциями, затем заполняю и обрабатываю очередь и, наконец, уничтожаю потоки, то есть, насколько я могу видеть, я не постоянно создавать / уничтожать потоки во время вычислений.
EDIT
Я забыл упомянуть, что работаю под C ++ 11. Вот соответствующий код C ++.
В основном. cc
ThreadPool *pool = new ThreadPool(fNThreads);
std::vector<std::function<void(int)>> *caller =
new std::vector<std::function<void(int)>>;
for (size_t iter = 0; iter < nIter; ++iter)
{
pool->ResetQueue();
for (size_t j = 0; nmax < 2; ++j)
{
caller->push_back(
[=](int iThr){function(iter, j, iThr);});
pool->PushTask((*caller)[j]);
}
pool->WaitForCompletion(1.e-4);
caller->clear();
}
delete caller;
delete pool;
SynchronizedQueue.hh
#ifndef SYNCQUEUE_H
#define SYNCQUEUE_H
#include <list>
#include <mutex>
#include <condition_variable>
template<typename T>
class SynchronizedQueue
{
public:
SynchronizedQueue();
~SynchronizedQueue();
void Put(T const & data);
void Put(T const && data);
T Get();
size_t Size();
SynchronizedQueue(SynchronizedQueue const &) = delete;
SynchronizedQueue & operator=(SynchronizedQueue const &) = delete;
SynchronizedQueue(SynchronizedQueue&&) = delete;
SynchronizedQueue & operator=(SynchronizedQueue&&) = delete;
private:
std::list<T> queue;
std::mutex mut;
std::condition_variable condvar;
};
template<typename T>
SynchronizedQueue<T>::SynchronizedQueue()
{}
template<typename T>
SynchronizedQueue<T>::~SynchronizedQueue()
{}
template<typename T>
void SynchronizedQueue<T>::Put(T const & data)
{
std::unique_lock<std::mutex> lck(mut);
queue.push_back(data);
condvar.notify_one();
}
template<typename T>
T SynchronizedQueue<T>::Get()
{
std::unique_lock<std::mutex> lck(mut);
while (queue.empty())
{
condvar.wait(lck);
}
T result = queue.front();
queue.pop_front();
return result;
}
template<typename T>
size_t SynchronizedQueue<T>::Size()
{
std::unique_lock<std::mutex> lck(mut);
return queue.size();
}
#endif
ThreadPool.hh
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include "SynchronizedQueue.hh"
#include <atomic>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>
class ThreadPool
{
public:
ThreadPool(unsigned int nThreads = 1);
virtual ~ThreadPool();
void PushTask(std::function<void(int)> func);
void WaitForCompletion();
void WaitForCompletion(int sec);
void ResetQueue();
void JoinThreads();
void Delay(int sec);
size_t GetWorkQueueLength();
private:
void WorkerThread(int i);
std::atomic<bool> done;
unsigned int threadCount;
SynchronizedQueue<std::function<void(int)>> workQueue;
std::vector<std::thread> threads;
};
#endif
пул потоков cc
#include "ThreadPool.hh"
#include "SynchronizedQueue.hh"
#include <chrono>
//#include <iostream>
void doNothing(int i)
{}
ThreadPool::ThreadPool(unsigned int nThreads)
: done(false)
{
if (nThreads <= 0)
{
threadCount = std::thread::hardware_concurrency();
}
else
{
threadCount = nThreads;
}
for (unsigned int i = 0; i < threadCount; ++i)
{
threads.push_back(std::thread(&ThreadPool::WorkerThread, this, i));
}
}
ThreadPool::~ThreadPool()
{
WaitForCompletion();
JoinThreads();
}
void ThreadPool::WaitForCompletion(int sec)
{
if (!done)
{
while (GetWorkQueueLength())
{
std::this_thread::sleep_for(std::chrono::seconds(sec));
}
done = true;
for (unsigned int i = 0; i < threadCount; ++i)
{
PushTask(&doNothing);
}
}
}
void ThreadPool::WaitForCompletion()
{
if (!done)
{
while (GetWorkQueueLength())
{}
done = true;
for (unsigned int i = 0; i < threadCount; ++i)
{
PushTask(&doNothing);
}
}
}
void ThreadPool::JoinThreads()
{
for (auto& th : threads)
{
if (th.joinable())
{
th.join();
}
}
}
void ThreadPool::Delay(int sec)
{
std::this_thread::sleep_for(std::chrono::seconds(sec));
}
void ThreadPool::PushTask(std::function<void(int)> func)
{
workQueue.Put(func);
}
void ThreadPool::ResetQueue()
{
done = false;
}
void ThreadPool::WorkerThread(int i)
{
while (!done)
{
workQueue.Get()(i);
}
}
size_t ThreadPool::GetWorkQueueLength()
{
return workQueue.Size();
}