Неутешительное исполнение пользовательского пула потоков - PullRequest
0 голосов
/ 26 февраля 2019

Я застрял в некоторой старой базе кода, которая не будет компилироваться ни с чем, кроме компиляторов Borland.Код выполняет некоторые вычисления, которые хорошо работают параллельно, но поскольку компилятор не поддерживает OpenMP, я попытался создать свой собственный поток потоков, построенный на основе WinAPI WAIT_CONDITION и SRWLOCK.Кажется, что сам пул работает нормально, за исключением того, что производительность не намного лучше, чем однопоточный код.Мне трудно поверить, что логика блокировки была бы настолько тяжелой, что полностью уничтожила бы преимущества параллельной обработки.Беглый взгляд на Process Explorer показывает мне, что каждый из моих 8 (четырехъядерных HT CPU) рабочих использует около 0,5% процессорного времени, что заставляет меня думать, что рабочие проводят большую часть времени в спящем режиме.Что мне здесь не хватает?И да, я уверен, что бит, который я пытаюсь запустить параллельно, является самым горячим путем.

Некоторые соответствующие биты кода:

class Barrier {
public:
    Barrier(const int workers) :
            working(0),
            workers(workers)
    {
            ::InitializeSRWLock(&lock);
            ::InitializeConditionVariable(&waitForWork);
            ::InitializeConditionVariable(&workDone);
    }

    ~Barrier()
    {
    }

    void Randezvous()
    {
            ::ReleaseSRWLockExclusive(&lock);

            ::WakeAllConditionVariable(&waitForWork);

            ::AcquireSRWLockExclusive(&lock);
            while (working > 0)
                    ::SleepConditionVariableSRW(&workDone, &lock, INFINITE, 0);
            ::ReleaseSRWLockExclusive(&lock);
    }

    volatile long working;

    SRWLOCK lock;
    CONDITION_VARIABLE waitForWork;
    CONDITION_VARIABLE workDone;

private:
    const long workers;
};

class Worker {
public:
    Worker(Barrier *_bar) :
            /* Some worker data */
            terminate(false),
            failed(false),
            hThread(NULL),
            threadId(0),
            bar(_bar)
    {
    }

    /* Some worker data */

    bool terminate;
    bool failed;

    HANDLE hThread;
    DWORD threadId;

    Barrier *bar;

private:
    Worker(const Worker &other)
    {
    }
};

bool WorkingBlock::Process(/* Some worker data */)
{
    ::AcquireSRWLockExclusive(&m_barrier->lock);
    for (int thr = 0; thr < int(m_NThreads); thr++) {
            Worker *wrk = m_workers->operator[](thr);
            /* Setup workers */
            PrepareWorker(wrk); /* This increments the "working" variable in barrier */
            wrk->processing = true;
    }

    /* Wait till workers finish */
    m_barrier->Randezvous();

    /* Process results */
}

inline
DWORD WINAPI WorkerProc(LPVOID param)
{
    Worker *wrk = static_cast<Worker *>(param);

    while (true) {
        ::AcquireSRWLockShared(&wrk->bar->lock);
        while (!wrk->processing && !wrk->terminate)
            ::SleepConditionVariableSRW(&wrk->bar->waitForWork, &wrk->bar->lock,
                                        INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED);
        ::ReleaseSRWLockShared(&wrk->bar->lock);

        if (wrk->terminate) {
            return 0;
        }

        /* Do the calculation */

        wrk->processing = false;

        ::AcquireSRWLockExclusive(&wrk->bar->lock);
        wrk->bar->working--;
        if (wrk->bar->working == 0) {
            ::ReleaseSRWLockExclusive(&wrk->bar->lock);
            ::WakeConditionVariable(&wrk->bar->workDone);
        } else
            ::ReleaseSRWLockExclusive(&wrk->bar->lock);
    }

    return 0;
}

Я заранее раскручиваю рабочие потоки ипусть они спят и ждут, пока не будет готова новая партия работ.Есть ли какой-то конфликт синхронизации, которого мне не хватает?

Спасибо ...

РЕДАКТИРОВАТЬ: добавлено использование флага processing Worker в коде.

1 Ответ

0 голосов
/ 28 февраля 2019

После еще одного исследования оказалось, что результаты профилировщика нуждаются в некоторой дополнительной интерпретации.Кроме того, в основном цикле было много неожиданно неэффективных фрагментов кода.Позаботившись о худшем и распараллеливая цикл в нескольких местах, я получаю приличное повышение производительности.При достаточно большой проблеме я могу получить до 60% средней загрузки процессора на процессоре 4C / 8T.Это далеко не так хорошо, как было бы с OpenMP, но лучше, чем ничего.

Для дальнейшего использования, это OpenMP-подобный пул потоков для распараллеливания циклов for, с которыми я закончил.

SThreadPool.h

#ifndef STHREADPOOL_H
#define STHREADPOOL_H

#include <stdexcept>
#include <vector>

#if (__cplusplus >= 201103L) || (_MSC_VER >= 1900)
        #define STP_HAVE_CPP11
#endif // CPP11 check

#ifdef STP_HAVE_CPP11
        #define STP_NOTHROW noexcept
#else
        #define STP_NOTHROW throw()
#endif // STP_HAVE_CPP11

namespace stpool {

class Exception : public std::runtime_error {
public:
        explicit Exception(const char *msg);
};

enum WorkerResult {
        WR_SUCCESS,     /*!< Worker finished correctly, results are OK */
        WR_FAILURE,     /*!< Worker finished abnormally, results should be discarded */
        WR_SKIPPED,     /*!< Worker did not execute because the job size was too small */
        WR_INVALID      /*!< This state must never be returned */
};

class Barrier;
class WorkerPrivate;

class Worker {
public:
        Worker();

        WorkerResult result;    /*<! Result of last worker cycle */
        WorkerPrivate *priv;    /*<! Internal worker data, do not touch!*/

private:
        Worker(const Worker &other);
        Worker & operator=(const Worker &other);
};

typedef std::vector<Worker *> WorkerVec;
typedef const std::vector<Worker *> & CWorkerVecRef;
typedef void * Payload;
typedef std::vector<Payload> PayloadVec;

/*!
 * WorkerFunc prototype.
 *
 * @param[in] from First index of the portion of the loop
 * @param[in] to Last index of the portion of the loop
 * @param[in,out] p Data specific for the given job
 *
 * @retval true Job succeeded
 * @retval false Job failed
 */
typedef bool (*WorkerFunc)(const int from, const int to, Payload p);

/*!
 * Specifies the condition for the last element in the loop
 */
enum TerminationPolicy {
        TPOL_INCLUSIVE, /*!< "=<" terminating condition */
        TPOL_EXCLUSIVE  /*!< "<" terminating condition */
};

/*!
 * ThreadPool object capable of executing for-loops in parallel.
 * Think of this as poor man's OpenMP...
 */
class ThreadPool {
public:
        /*!
         * ThreadPool c-tor
         *
         * @param[in] NThreads Number of worker threads to prepare
         */
        ThreadPool(const long NThreads);
        ~ThreadPool();

        /*!
         * Runs the parallel job. Payloads must be set up before this function
         * is called.
         *
         * @param[in] from First index in the loop
         * @param[in] to Last index in the loop
         * @param[in,out] payloads Vector of data specific for the given loop.
         *                          Size of the vector must be the same as the number
         *                          of worker threads.
         * @param[in] func Function that performs the actual calculation
         *
         * @return Vector of finished workers
         */
        template <TerminationPolicy Policy>
        CWorkerVecRef Process(const int from, const int to,
                              const PayloadVec &payloads, WorkerFunc func) STP_NOTHROW;

        /*!
         * Returns number of available worker threads
         *
         * @return Number of available worker threads
         */
        long Threads() const;

private:
        ThreadPool(const ThreadPool &other);
        ThreadPool & operator=(const ThreadPool &other);

        void Cleanup();
        void PrepareWorker(Worker *wrk, const int from, const int to,
                           Payload payload, WorkerFunc func);
        void SkipWorker(Worker *wrk);

        WorkerVec m_workers;
        Barrier *m_barrier;

        const long m_NThreads;
};

/*!
 * Number of available logical CPUs
 */
int NumOfCPUs();

} // namespace stpool

#endif // STHREADPOOL_H

SThreadPool.cpp

#include "SThreadPool.h"

#include <Windows.h>

#include <cassert>

#define STP_USE_SRWLOCK 1
#define STP_USE_SYSTEM_SRWLOCK 0
#define STP_SPIN_FOR_RDVZ 0

/* Spinning for rendezvous seems to be more efficient only
 * when there is less workers that available CPUs */

#if STP_USE_SRWLOCK
        #if !STP_USE_SYSTEM_SRWLOCK
                #define SRWLOCK_INIT RTL_SRWLOCK_INIT
                typedef RTL_SRWLOCK SRWLOCK, *PSRWLOCK;
        #endif // USE_SYSTEM_SRWLOCK

        #define LOCK_PRIM SRWLOCK
        #define InitLock(lk) ::InitializeSRWLock(lk)
        #define DeleteLock(lk)
        #define AcquireLkExcl(lk) ::AcquireSRWLockExclusive(lk)
        #define AcquireLkShared(lk) ::AcquireSRWLockShared(lk)
        #define ReleaseLkExcl(lk) ::ReleaseSRWLockExclusive(lk)
        #define ReleaseLkShared(lk) ::ReleaseSRWLockShared(lk)
        #define WaitCondExcl(wc, lk) ::SleepConditionVariableSRW(wc, lk, INFINITE, 0)
        #define WaitCondShared(wc, lk) ::SleepConditionVariableSRW(wc, lk, INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED)
#else
        #define LOCK_PRIM CRITICAL_SECTION
        #define InitLock(lk) ::InitializeCriticalSectionAndSpinCount(lk, 5000)
        #define DeleteLock(lk) ::DeleteCriticalSection(lk)
        #define AcquireLkExcl(lk) ::EnterCriticalSection(lk)
        #define AcquireLkShared(lk) ::EnterCriticalSection(lk)
        #define ReleaseLkExcl(lk) ::LeaveCriticalSection(lk)
        #define ReleaseLkShared(lk) ::LeaveCriticalSection(lk)
        #define WaitCondExcl(wc, lk) ::SleepConditionVariableCS(wc, lk, INFINITE)
        #define WaitCondShared(wc, lk) ::SleepConditionVariableCS(wc, lk, INFINITE)
#endif // STP_USE_SRWLOCK

namespace stpool {

Exception::Exception(const char *msg) :
        std::runtime_error(msg)
{
}

class Barrier {
public:
        Barrier(const long workers) :
                working(0),
                workers(workers)
        {
                InitLock(&lock);
                ::InitializeConditionVariable(&waitForWork);
        #if !STP_SPIN_FOR_RDVZ
                ::InitializeConditionVariable(&workDone);
        #endif // STP_SPIN_FOR_RDVZ
        }

        ~Barrier()
        {
                DeleteLock(&lock);
        }

        void Rendezvous();

        volatile long working;

        LOCK_PRIM lock;
        CONDITION_VARIABLE waitForWork;
#if !STP_SPIN_FOR_RDVZ
        CONDITION_VARIABLE workDone;
#endif // STP_SPIN_FOR_RDVZ

private:
        const long workers;
};

void Barrier::Rendezvous()
{
        ReleaseLkExcl(&lock);

        ::WakeAllConditionVariable(&waitForWork);

#if !STP_SPIN_FOR_RDVZ
        AcquireLkExcl(&lock);
        while (working > 0)
                WaitCondExcl(&workDone, &lock);
        ReleaseLkExcl(&lock);
#else
        while (working > 0)
                YieldProcessor();
#endif // STP_SPIN_FOR_RDVZ
}

Worker::Worker() :
        result(WR_FAILURE)
{
}

class WorkerPrivate {
public:
        WorkerPrivate() :
                from(-1),
                to(-1),
                payload(NULL),
                func(NULL),
                process(false),
                terminate(false),
                failed(false),
                threadId(0),
                hThread(NULL),
                barrier(NULL)
        {
        }

        int from;
        int to;
        Payload payload;
        WorkerFunc func;

        bool process;
        bool terminate;
        bool failed;

        DWORD threadId;
        HANDLE hThread;
        Barrier *barrier;
};

static
DWORD WINAPI ThreadProc(LPVOID param)
{
        Worker *wrk = static_cast<Worker *>(param);
        WorkerPrivate *priv = wrk->priv;

        while (true) {
        #ifdef STP_PRN_TPTS
                {
                AnsiString str("Worker waiting: ");
                str += wrk->threadId;
                OutputDebugStringA(str.c_str());
                }
        #endif // STP_PRN_TPTS

                AcquireLkShared(&priv->barrier->lock);
                while (!priv->process && !priv->terminate)
                        WaitCondShared(&priv->barrier->waitForWork, &priv->barrier->lock);
                ReleaseLkShared(&priv->barrier->lock);

                if (priv->terminate)
                        return 0;

                assert(priv->payload != NULL);
                assert(priv->func != NULL);

                const bool ret = priv->func(priv->from, priv->to, priv->payload);
                wrk->result = ret ? WR_SUCCESS : WR_FAILURE;

                priv->process = false;

        #ifdef STP_PRN_TPTS
                {
                AnsiString str("Worker done: ");
                str += wrk->threadId;
                OutputDebugStringA(str.c_str());
                }
        #endif // STP_PRN_TPTS

        #if !STP_SPIN_FOR_RDVZ
                AcquireLkExcl(&priv->barrier->lock);
                priv->barrier->working--;
                if (priv->barrier->working == 0) {
                        ReleaseLkExcl(&priv->barrier->lock);
                        ::WakeConditionVariable(&priv->barrier->workDone);
                } else
                        ReleaseLkExcl(&priv->barrier->lock);
        #else
                ::InterlockedDecrement(&priv->barrier->working);
        #endif // SPIN_FOR_RDVZ
        }

        return 0;
}

ThreadPool::ThreadPool(const long NThreads) :
        m_NThreads(NThreads)
{
        if (m_NThreads < 1)
                throw Exception("Invalid argument");

        m_barrier = new Barrier(m_NThreads);

        m_workers.reserve(NThreads);
        for (int thr = 0; thr < m_NThreads ; thr++) {
                Worker *wrk = new Worker();
                WorkerPrivate *priv = new WorkerPrivate;

                DWORD thrId;
                HANDLE hThread = ::CreateThread(NULL, 0, ThreadProc, wrk, 0, &thrId);
                if (hThread == NULL) {
                        delete priv;
                        delete wrk;
                        Cleanup();

                        throw Exception("Failed to initialize thread pool");
                }

                priv->threadId = thrId;
                priv->hThread = hThread;
                priv->barrier = m_barrier;
                wrk->priv = priv;

                m_workers.push_back(wrk);
        }
}

ThreadPool::~ThreadPool()
{
        Cleanup();

        delete m_barrier;
}

void ThreadPool::Cleanup()
{
        AcquireLkExcl(&m_barrier->lock);
        for (size_t idx = 0; idx < m_workers.size(); idx++)
                m_workers[idx]->priv->terminate = true;
        ReleaseLkExcl(&m_barrier->lock);

        ::WakeAllConditionVariable(&m_barrier->waitForWork);

        for (size_t idx = 0; idx < m_workers.size(); idx++) {
                Worker *wrk = m_workers[idx];

                if (!wrk->priv->failed) {
                        ::WaitForSingleObject(wrk->priv->hThread, INFINITE);
                        ::CloseHandle(wrk->priv->hThread);
                }

                delete wrk->priv;
                delete wrk;
        }
}

template <>
CWorkerVecRef ThreadPool::Process<TPOL_EXCLUSIVE>(const int from, const int to,
                                                  const PayloadVec &payloads, WorkerFunc func) STP_NOTHROW
{
        assert(to > from);
        assert(m_workers.size() == payloads.size());

        const int slice = int(float(to - from) / m_NThreads + 0.5F);

#ifdef STP_PRN_TPTS
        OutputDebugStringA("--- Para start");
#endif // STP_PRN_TPTS

        int idx = from;

        AcquireLkExcl(&m_barrier->lock);
        for (long thr = 0; thr < m_NThreads - 1; thr++) {
                Worker *wrk = m_workers[thr];
                Payload pl = payloads[thr];

                if (idx > to) {
                        SkipWorker(wrk);
                        break;
                }

                int realTo = idx + slice;
                if (realTo > to)
                        realTo = to;

                PrepareWorker(wrk, idx, realTo, pl, func);
                idx += slice;
        }

        Worker *wrk = m_workers.back();
        Payload pl = payloads.back();
        if (idx <= to)
                PrepareWorker(wrk, idx, to, pl, func);
        else
                SkipWorker(wrk);

        m_barrier->Rendezvous();

        return m_workers;
}

template <>
CWorkerVecRef ThreadPool::Process<TPOL_INCLUSIVE>(const int from, const int to,
                                                  const PayloadVec &payloads, WorkerFunc func) STP_NOTHROW
{
        assert(to >= from);
        assert(m_workers.size() == payloads.size());

        const int slice = int(float(to - from) / m_NThreads + 0.5F);

#ifdef STP_PRN_TPTS
        OutputDebugStringA("--- Para start");
#endif // STP_PRN_TPTS

        int idx = from;

        AcquireLkExcl(&m_barrier->lock);
        for (long thr = 0; thr < m_NThreads - 1; thr++) {
                Worker *wrk = m_workers[thr];
                Payload pl = payloads[thr];

                if (idx > to) {
                        SkipWorker(wrk);
                        break;
                }

                int realTo = idx + slice;
                if (realTo > to)
                        realTo = to;

                PrepareWorker(wrk, idx, realTo, pl, func);
                idx += slice + 1;
        }

        Worker *wrk = m_workers.back();
        Payload pl = payloads.back();
        if (idx <= to)
                PrepareWorker(wrk, idx, to, pl, func);
        else
                SkipWorker(wrk);

        m_barrier->Rendezvous();

        return m_workers;
}

void ThreadPool::PrepareWorker(Worker *wrk, const int from, const int to,
                               Payload payload, WorkerFunc func)
{
        WorkerPrivate *priv = wrk->priv;

        wrk->result = WR_INVALID;

        priv->from = from;
        priv->to = to;
        priv->payload = payload;
        priv->func = func;
        priv->process = true;

        m_barrier->working++;
}

void ThreadPool::SkipWorker(Worker *wrk)
{
        wrk->result = WR_SKIPPED;
}

long ThreadPool::Threads() const
{
        return m_NThreads;
}

int NumOfCPUs()
{
        SYSTEM_INFO info;

        GetSystemInfo(&info);
        return info.dwNumberOfProcessors;
}

} // namespace stpool
...