После еще одного исследования оказалось, что результаты профилировщика нуждаются в некоторой дополнительной интерпретации.Кроме того, в основном цикле было много неожиданно неэффективных фрагментов кода.Позаботившись о худшем и распараллеливая цикл в нескольких местах, я получаю приличное повышение производительности.При достаточно большой проблеме я могу получить до 60% средней загрузки процессора на процессоре 4C / 8T.Это далеко не так хорошо, как было бы с OpenMP, но лучше, чем ничего.
Для дальнейшего использования, это OpenMP-подобный пул потоков для распараллеливания циклов for, с которыми я закончил.
SThreadPool.h
#ifndef STHREADPOOL_H
#define STHREADPOOL_H
#include <stdexcept>
#include <vector>
#if (__cplusplus >= 201103L) || (_MSC_VER >= 1900)
#define STP_HAVE_CPP11
#endif // CPP11 check
#ifdef STP_HAVE_CPP11
#define STP_NOTHROW noexcept
#else
#define STP_NOTHROW throw()
#endif // STP_HAVE_CPP11
namespace stpool {
class Exception : public std::runtime_error {
public:
explicit Exception(const char *msg);
};
enum WorkerResult {
WR_SUCCESS, /*!< Worker finished correctly, results are OK */
WR_FAILURE, /*!< Worker finished abnormally, results should be discarded */
WR_SKIPPED, /*!< Worker did not execute because the job size was too small */
WR_INVALID /*!< This state must never be returned */
};
class Barrier;
class WorkerPrivate;
class Worker {
public:
Worker();
WorkerResult result; /*<! Result of last worker cycle */
WorkerPrivate *priv; /*<! Internal worker data, do not touch!*/
private:
Worker(const Worker &other);
Worker & operator=(const Worker &other);
};
typedef std::vector<Worker *> WorkerVec;
typedef const std::vector<Worker *> & CWorkerVecRef;
typedef void * Payload;
typedef std::vector<Payload> PayloadVec;
/*!
* WorkerFunc prototype.
*
* @param[in] from First index of the portion of the loop
* @param[in] to Last index of the portion of the loop
* @param[in,out] p Data specific for the given job
*
* @retval true Job succeeded
* @retval false Job failed
*/
typedef bool (*WorkerFunc)(const int from, const int to, Payload p);
/*!
* Specifies the condition for the last element in the loop
*/
enum TerminationPolicy {
TPOL_INCLUSIVE, /*!< "=<" terminating condition */
TPOL_EXCLUSIVE /*!< "<" terminating condition */
};
/*!
* ThreadPool object capable of executing for-loops in parallel.
* Think of this as poor man's OpenMP...
*/
class ThreadPool {
public:
/*!
* ThreadPool c-tor
*
* @param[in] NThreads Number of worker threads to prepare
*/
ThreadPool(const long NThreads);
~ThreadPool();
/*!
* Runs the parallel job. Payloads must be set up before this function
* is called.
*
* @param[in] from First index in the loop
* @param[in] to Last index in the loop
* @param[in,out] payloads Vector of data specific for the given loop.
* Size of the vector must be the same as the number
* of worker threads.
* @param[in] func Function that performs the actual calculation
*
* @return Vector of finished workers
*/
template <TerminationPolicy Policy>
CWorkerVecRef Process(const int from, const int to,
const PayloadVec &payloads, WorkerFunc func) STP_NOTHROW;
/*!
* Returns number of available worker threads
*
* @return Number of available worker threads
*/
long Threads() const;
private:
ThreadPool(const ThreadPool &other);
ThreadPool & operator=(const ThreadPool &other);
void Cleanup();
void PrepareWorker(Worker *wrk, const int from, const int to,
Payload payload, WorkerFunc func);
void SkipWorker(Worker *wrk);
WorkerVec m_workers;
Barrier *m_barrier;
const long m_NThreads;
};
/*!
* Number of available logical CPUs
*/
int NumOfCPUs();
} // namespace stpool
#endif // STHREADPOOL_H
SThreadPool.cpp
#include "SThreadPool.h"
#include <Windows.h>
#include <cassert>
#define STP_USE_SRWLOCK 1
#define STP_USE_SYSTEM_SRWLOCK 0
#define STP_SPIN_FOR_RDVZ 0
/* Spinning for rendezvous seems to be more efficient only
* when there is less workers that available CPUs */
#if STP_USE_SRWLOCK
#if !STP_USE_SYSTEM_SRWLOCK
#define SRWLOCK_INIT RTL_SRWLOCK_INIT
typedef RTL_SRWLOCK SRWLOCK, *PSRWLOCK;
#endif // USE_SYSTEM_SRWLOCK
#define LOCK_PRIM SRWLOCK
#define InitLock(lk) ::InitializeSRWLock(lk)
#define DeleteLock(lk)
#define AcquireLkExcl(lk) ::AcquireSRWLockExclusive(lk)
#define AcquireLkShared(lk) ::AcquireSRWLockShared(lk)
#define ReleaseLkExcl(lk) ::ReleaseSRWLockExclusive(lk)
#define ReleaseLkShared(lk) ::ReleaseSRWLockShared(lk)
#define WaitCondExcl(wc, lk) ::SleepConditionVariableSRW(wc, lk, INFINITE, 0)
#define WaitCondShared(wc, lk) ::SleepConditionVariableSRW(wc, lk, INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED)
#else
#define LOCK_PRIM CRITICAL_SECTION
#define InitLock(lk) ::InitializeCriticalSectionAndSpinCount(lk, 5000)
#define DeleteLock(lk) ::DeleteCriticalSection(lk)
#define AcquireLkExcl(lk) ::EnterCriticalSection(lk)
#define AcquireLkShared(lk) ::EnterCriticalSection(lk)
#define ReleaseLkExcl(lk) ::LeaveCriticalSection(lk)
#define ReleaseLkShared(lk) ::LeaveCriticalSection(lk)
#define WaitCondExcl(wc, lk) ::SleepConditionVariableCS(wc, lk, INFINITE)
#define WaitCondShared(wc, lk) ::SleepConditionVariableCS(wc, lk, INFINITE)
#endif // STP_USE_SRWLOCK
namespace stpool {
Exception::Exception(const char *msg) :
std::runtime_error(msg)
{
}
class Barrier {
public:
Barrier(const long workers) :
working(0),
workers(workers)
{
InitLock(&lock);
::InitializeConditionVariable(&waitForWork);
#if !STP_SPIN_FOR_RDVZ
::InitializeConditionVariable(&workDone);
#endif // STP_SPIN_FOR_RDVZ
}
~Barrier()
{
DeleteLock(&lock);
}
void Rendezvous();
volatile long working;
LOCK_PRIM lock;
CONDITION_VARIABLE waitForWork;
#if !STP_SPIN_FOR_RDVZ
CONDITION_VARIABLE workDone;
#endif // STP_SPIN_FOR_RDVZ
private:
const long workers;
};
void Barrier::Rendezvous()
{
ReleaseLkExcl(&lock);
::WakeAllConditionVariable(&waitForWork);
#if !STP_SPIN_FOR_RDVZ
AcquireLkExcl(&lock);
while (working > 0)
WaitCondExcl(&workDone, &lock);
ReleaseLkExcl(&lock);
#else
while (working > 0)
YieldProcessor();
#endif // STP_SPIN_FOR_RDVZ
}
Worker::Worker() :
result(WR_FAILURE)
{
}
class WorkerPrivate {
public:
WorkerPrivate() :
from(-1),
to(-1),
payload(NULL),
func(NULL),
process(false),
terminate(false),
failed(false),
threadId(0),
hThread(NULL),
barrier(NULL)
{
}
int from;
int to;
Payload payload;
WorkerFunc func;
bool process;
bool terminate;
bool failed;
DWORD threadId;
HANDLE hThread;
Barrier *barrier;
};
static
DWORD WINAPI ThreadProc(LPVOID param)
{
Worker *wrk = static_cast<Worker *>(param);
WorkerPrivate *priv = wrk->priv;
while (true) {
#ifdef STP_PRN_TPTS
{
AnsiString str("Worker waiting: ");
str += wrk->threadId;
OutputDebugStringA(str.c_str());
}
#endif // STP_PRN_TPTS
AcquireLkShared(&priv->barrier->lock);
while (!priv->process && !priv->terminate)
WaitCondShared(&priv->barrier->waitForWork, &priv->barrier->lock);
ReleaseLkShared(&priv->barrier->lock);
if (priv->terminate)
return 0;
assert(priv->payload != NULL);
assert(priv->func != NULL);
const bool ret = priv->func(priv->from, priv->to, priv->payload);
wrk->result = ret ? WR_SUCCESS : WR_FAILURE;
priv->process = false;
#ifdef STP_PRN_TPTS
{
AnsiString str("Worker done: ");
str += wrk->threadId;
OutputDebugStringA(str.c_str());
}
#endif // STP_PRN_TPTS
#if !STP_SPIN_FOR_RDVZ
AcquireLkExcl(&priv->barrier->lock);
priv->barrier->working--;
if (priv->barrier->working == 0) {
ReleaseLkExcl(&priv->barrier->lock);
::WakeConditionVariable(&priv->barrier->workDone);
} else
ReleaseLkExcl(&priv->barrier->lock);
#else
::InterlockedDecrement(&priv->barrier->working);
#endif // SPIN_FOR_RDVZ
}
return 0;
}
ThreadPool::ThreadPool(const long NThreads) :
m_NThreads(NThreads)
{
if (m_NThreads < 1)
throw Exception("Invalid argument");
m_barrier = new Barrier(m_NThreads);
m_workers.reserve(NThreads);
for (int thr = 0; thr < m_NThreads ; thr++) {
Worker *wrk = new Worker();
WorkerPrivate *priv = new WorkerPrivate;
DWORD thrId;
HANDLE hThread = ::CreateThread(NULL, 0, ThreadProc, wrk, 0, &thrId);
if (hThread == NULL) {
delete priv;
delete wrk;
Cleanup();
throw Exception("Failed to initialize thread pool");
}
priv->threadId = thrId;
priv->hThread = hThread;
priv->barrier = m_barrier;
wrk->priv = priv;
m_workers.push_back(wrk);
}
}
ThreadPool::~ThreadPool()
{
Cleanup();
delete m_barrier;
}
void ThreadPool::Cleanup()
{
AcquireLkExcl(&m_barrier->lock);
for (size_t idx = 0; idx < m_workers.size(); idx++)
m_workers[idx]->priv->terminate = true;
ReleaseLkExcl(&m_barrier->lock);
::WakeAllConditionVariable(&m_barrier->waitForWork);
for (size_t idx = 0; idx < m_workers.size(); idx++) {
Worker *wrk = m_workers[idx];
if (!wrk->priv->failed) {
::WaitForSingleObject(wrk->priv->hThread, INFINITE);
::CloseHandle(wrk->priv->hThread);
}
delete wrk->priv;
delete wrk;
}
}
template <>
CWorkerVecRef ThreadPool::Process<TPOL_EXCLUSIVE>(const int from, const int to,
const PayloadVec &payloads, WorkerFunc func) STP_NOTHROW
{
assert(to > from);
assert(m_workers.size() == payloads.size());
const int slice = int(float(to - from) / m_NThreads + 0.5F);
#ifdef STP_PRN_TPTS
OutputDebugStringA("--- Para start");
#endif // STP_PRN_TPTS
int idx = from;
AcquireLkExcl(&m_barrier->lock);
for (long thr = 0; thr < m_NThreads - 1; thr++) {
Worker *wrk = m_workers[thr];
Payload pl = payloads[thr];
if (idx > to) {
SkipWorker(wrk);
break;
}
int realTo = idx + slice;
if (realTo > to)
realTo = to;
PrepareWorker(wrk, idx, realTo, pl, func);
idx += slice;
}
Worker *wrk = m_workers.back();
Payload pl = payloads.back();
if (idx <= to)
PrepareWorker(wrk, idx, to, pl, func);
else
SkipWorker(wrk);
m_barrier->Rendezvous();
return m_workers;
}
template <>
CWorkerVecRef ThreadPool::Process<TPOL_INCLUSIVE>(const int from, const int to,
const PayloadVec &payloads, WorkerFunc func) STP_NOTHROW
{
assert(to >= from);
assert(m_workers.size() == payloads.size());
const int slice = int(float(to - from) / m_NThreads + 0.5F);
#ifdef STP_PRN_TPTS
OutputDebugStringA("--- Para start");
#endif // STP_PRN_TPTS
int idx = from;
AcquireLkExcl(&m_barrier->lock);
for (long thr = 0; thr < m_NThreads - 1; thr++) {
Worker *wrk = m_workers[thr];
Payload pl = payloads[thr];
if (idx > to) {
SkipWorker(wrk);
break;
}
int realTo = idx + slice;
if (realTo > to)
realTo = to;
PrepareWorker(wrk, idx, realTo, pl, func);
idx += slice + 1;
}
Worker *wrk = m_workers.back();
Payload pl = payloads.back();
if (idx <= to)
PrepareWorker(wrk, idx, to, pl, func);
else
SkipWorker(wrk);
m_barrier->Rendezvous();
return m_workers;
}
void ThreadPool::PrepareWorker(Worker *wrk, const int from, const int to,
Payload payload, WorkerFunc func)
{
WorkerPrivate *priv = wrk->priv;
wrk->result = WR_INVALID;
priv->from = from;
priv->to = to;
priv->payload = payload;
priv->func = func;
priv->process = true;
m_barrier->working++;
}
void ThreadPool::SkipWorker(Worker *wrk)
{
wrk->result = WR_SKIPPED;
}
long ThreadPool::Threads() const
{
return m_NThreads;
}
int NumOfCPUs()
{
SYSTEM_INFO info;
GetSystemInfo(&info);
return info.dwNumberOfProcessors;
}
} // namespace stpool