Функция ожидания условной переменной, вызывающая непредвиденное поведение при предоставлении предиката - PullRequest
0 голосов
/ 06 февраля 2019

В качестве учебного упражнения я реализую пул потоков, используя условные переменные.Поток контроллера создает пул потоков, ожидающих сигнала (для атомарной переменной устанавливается значение выше нуля).Когда потоки сигнализируют о пробуждении, выполняют свою работу, а когда последний поток завершается, он сигнализирует о пробуждении основного потока.Поток контроллера блокируется до завершения последнего потока.Затем пул становится доступным для последующего повторного использования.

Время от времени я получал тайм-аут в потоке контроллера, ожидающий, чтобы рабочий сообщил о завершении (вероятно, из-за состояния гонки при уменьшении активного счетчика работы), поэтому в попытке упрочить пул я заменил «wait (lck)» метода wait условной переменной на «wait (lck, Предикат)».Поскольку при этом поведение пула потоков таково, что, по-видимому, оно позволяет уменьшить активный счетчик работы ниже 0 (что является условием для повторного пробуждения потока контроллера) - у меня есть условие гонки.Я прочитал бесчисленные статьи по атомарным переменным, синхронизации, упорядочению памяти, ложным и потерянным пробуждениям по stackoverflow и различным другим сайтам, включил в себя то, что я узнал в меру своих возможностей, и до сих пор не могу на всю жизнь работатьпочему способ, которым я кодировал ожидаемое ожидание, просто не работает.Счетчик должен всегда быть таким же высоким, как количество потоков в пуле (скажем, 8) и равным нулю.Я начал терять веру в себя - просто не должно быть так сложно сделать что-то принципиально простое.Ясно, что здесь мне нужно изучить кое-что еще:)

Учитывая, конечно, что было состояние гонки, я гарантировал, что две переменные, которые управляют пробуждением и завершением пула, являются атомарными, и что обаизменяется только когда защищено с помощью unique_lock.В частности, я позаботился о том, чтобы при запуске запроса к пулу была получена блокировка, счетчик активных потоков был изменен с 0 на 8, разблокирован мьютекс, а затем «messages_all».Поток контроллера будет только тогда пробужден со счетчиком активных потоков, равным нулю, как только последний рабочий поток уменьшит его до этого уровня и уведомит "one_one ".

В рабочем потоке переменная условия будет ожидать и активироваться только тогда, когдачисло активных потоков больше нуля, разблокируйте мьютекс, параллельно продолжайте выполнять работу, предварительно назначенную процессору при создании пула, повторно получить мьютекс и атомарно уменьшить количество активных потоков.Затем он, хотя и предположительно защищенный блокировкой, проверит, был ли последний поток еще активным, и, если это так, снова разблокирует мьютекс и «notify_one», чтобы разбудить контроллер.

Проблема в том, чтоСчетчик активного потока многократно сбрасывается ниже нуля даже после 1 или 2 итераций.Если я протестирую счетчик активных потоков в начале новой рабочей нагрузки, я смогу найти счетчик активных потоков около -6 - это как если бы пулу было разрешено повторно пробуждать поток контроллера до завершения работы.

Учитывая, что счетчик потока и флаг завершения оба являются атомарными переменными и только когда-либо изменяются, в то время как под защитой того же мьютекса я использую последовательное упорядочение памяти для всех обновлений, я просто не могу видеть, как это происходит, и япотерян.

#include <stdafx.h>
#include <Windows.h>

#include <iostream>
#include <thread>
using std::thread;
#include <mutex>
using std::mutex;
using std::unique_lock;
#include <condition_variable>
using std::condition_variable;
#include <atomic>
using std::atomic;
#include <chrono>
#include <vector>
using std::vector;

class IWorkerThreadProcessor
{
public:
    virtual void Process(int) = 0;
};


class MyProcessor : public IWorkerThreadProcessor
{
    int index_ = 0;
public:
    MyProcessor(int index)
    {
        index_ = index;
    }

    void Process(int threadindex)
    {
    for (int i = 0; i < 5000000; i++);
        std::cout << '(' << index_ << ':' << threadindex << ") ";
    }
};


#define MsgBox(x) do{ MessageBox(NULL, x, L"", MB_OK ); }while(false)


class ThreadPool
{
private:
    atomic<unsigned int> invokations_ = 0;

     //This goes negative when using the wait_for with predicate
    atomic<int> threadsActive_ = 0;
    atomic<bool> terminateFlag_ = false;
    vector<std::thread> threads_;
    atomic<unsigned int> poolSize_ = 0;

    mutex mtxWorker_;
    condition_variable cvSignalWork_;
    condition_variable cvSignalComplete_;

public:

    ~ThreadPool()
    {
        TerminateThreads();
    }

    void Init(std::vector<IWorkerThreadProcessor*>& processors)
    {
        unique_lock<mutex> lck2(mtxWorker_);
        threadsActive_ = 0;
        terminateFlag_ = false;

        poolSize_ = processors.size();
        for (int i = 0; i < poolSize_; ++i)
            threads_.push_back(thread(&ThreadPool::launchMethod, this, processors[i], i));
    }

    void ProcessWorkload(std::chrono::milliseconds timeout)
    {
        //Only used to see how many invocations I was getting through before experiencing the issue - sadly it's only one or two
        invocations_++;
        try
        {
        unique_lock<mutex> lck(mtxWorker_);

        //!!!!!! If I use the predicated wait this break will fire !!!!!!
        if (threadsActive_.load() != 0)
        __debugbreak();

        threadsActive_.store(poolSize_);
        lck.unlock();
        cvSignalWork_.notify_all();

        lck.lock();
        if (!cvSignalComplete_.wait_for(
                lck,
                timeout,
                [this] { return threadsActive_.load() == 0; })
                )
        {
            //As you can tell this has taken me through a journey trying to characterise the issue...
            if (threadsActive_ > 0)
                MsgBox(L"Thread pool timed out with still active threads");
            else if (threadsActive_ == 0)
                MsgBox(L"Thread pool timed out with zero active threads");
            else
                MsgBox(L"Thread pool timed out with negative active threads");
            }
        }
        catch (std::exception e)
        {
            __debugbreak();
        }
    }

    void launchMethod(IWorkerThreadProcessor* processor, int threadIndex)
    {
        do
        {
            unique_lock<mutex> lck(mtxWorker_);

            //!!!!!! If I use this predicated wait I see the failure !!!!!!
            cvSignalWork_.wait(
                lck,
                [this] {
                return
                    threadsActive_.load() > 0 ||
                    terminateFlag_.load();
            });


            //!!!!!!!! Does not cause the failure but obviously will not handle
            //spurious wake-ups !!!!!!!!!!
            //cvSignalWork_.wait(lck);

            if (terminateFlag_.load())
                return;

            //Unlock to parallelise the work load
            lck.unlock();
            processor->Process(threadIndex);

            //Re-lock to decrement the work count
            lck.lock();
            //This returns the value before the subtraction so theoretically if the previous value was 1 then we're the last thread going and we can now signal the controller thread to wake.  This is the only place that the decrement happens so I don't know how it could possibly go negative
            if (threadsActive_.fetch_sub(1, std::memory_order_seq_cst) == 1)
            {
                lck.unlock();
                cvSignalComplete_.notify_one();
            }
            else
                lck.unlock();

        } while (true);
    }

    void TerminateThreads()
    {
        try
        {
            unique_lock<mutex> lck(mtxWorker_);
            if (!terminateFlag_)
            {
                terminateFlag_ = true;
                lck.unlock();
                cvSignalWork_.notify_all();

                for (int i = 0; i < threads_.size(); i++)
                    threads_[i].join();
            }
        }
        catch (std::exception e)
        {
            __debugbreak();
        }
    }
};


int main()
{
    std::vector<IWorkerThreadProcessor*> processors;
    for (int i = 0; i < 8; i++)
        processors.push_back(new MyProcessor(i));


    std::cout << "Instantiating thread pool\n";
    auto pool = new ThreadPool;
    std::cout << "Initialisting thread pool\n";
    pool->Init(processors);
    std::cout << "Thread pool initialised\n";

    for (int i = 0; i < 200; i++)
    {
        std::cout << "Workload " << i << "\n";
        pool->ProcessWorkload(std::chrono::milliseconds(500));
        std::cout << "Workload " << i << " complete." << "\n";
    }

    for (auto a : processors)
        delete a;

    delete pool;

    return 0;
}

Ответы [ 2 ]

0 голосов
/ 07 февраля 2019
class ThreadPool
{
private:
    atomic<unsigned int> invokations_ = 0;
    std::atomic<unsigned int> awakenings_ = 0;
    std::atomic<unsigned int> startedWorkloads_ = 0;
    std::atomic<unsigned int> completedWorkloads_ = 0;
    atomic<bool> terminate_ = false;
    atomic<bool> stillFiring_ = false;
    vector<std::thread> threads_;
    atomic<unsigned int> poolSize_ = 0;

    mutex mtx_;
    condition_variable cvSignalWork_;
    condition_variable cvSignalComplete_;

public:

    ~ThreadPool()
    {
        TerminateThreads();
    }


    void Init(std::vector<IWorkerThreadProcessor*>& processors)
    {
        unique_lock<mutex> lck2(mtx_);
        //threadsActive_ = 0;
        terminate_ = false;

        poolSize_ = processors.size();
        for (int i = 0; i < poolSize_; ++i)
            threads_.push_back(thread(&ThreadPool::launchMethod, this, processors[i], i));

        awakenings_ = 0;
        completedWorkloads_ = 0;
        startedWorkloads_ = 0;
        invokations_ = 0;
    }


    void ProcessWorkload(std::chrono::milliseconds timeout)
    {
        try
        {
            unique_lock<mutex> lck(mtx_);
            invokations_++;

            if (startedWorkloads_ != 0)
                __debugbreak();

            if (completedWorkloads_ != 0)
                __debugbreak();

            if (awakenings_ != 0)
                __debugbreak();

            if (stillFiring_)
                __debugbreak();

            stillFiring_ = true;
            lck.unlock();
            cvSignalWork_.notify_all();

            lck.lock();
            if (!cvSignalComplete_.wait_for(
                lck,
                timeout,
                //[this] { return this->threadsActive_.load() == 0; })
                [this] { return completedWorkloads_ == poolSize_ && !stillFiring_; })
                )
            {
                if (completedWorkloads_ < poolSize_)
                {
                    if (startedWorkloads_ < poolSize_)
                        MsgBox(L"Thread pool timed out with some threads unstarted");
                    else if (startedWorkloads_ == poolSize_)
                        MsgBox(L"Thread pool timed out with all threads started but not all completed");
                }
                else
                    __debugbreak();
            }


            if (completedWorkloads_ != poolSize_)
                __debugbreak();

            if (awakenings_ != poolSize_)
                __debugbreak();

            awakenings_ = 0;
            completedWorkloads_ = 0;
            startedWorkloads_ = 0;

        }
        catch (std::exception e)
        {
            __debugbreak();
        }

    }



    void launchMethod(IWorkerThreadProcessor* processor, int threadIndex)
    {
        do
        {
            unique_lock<mutex> lck(mtx_);
            cvSignalWork_.wait(
                lck,
                [this] {
                return
                    (stillFiring_ && (startedWorkloads_ < poolSize_)) ||
                    terminate_;
            });
            awakenings_++;

            if (startedWorkloads_ == 0 && terminate_)
                return;

            if (stillFiring_ && startedWorkloads_ < poolSize_) //guard against spurious wakeup
            {
                startedWorkloads_++;
                if (startedWorkloads_ == poolSize_)
                    stillFiring_ = false;

                lck.unlock();
                processor->Process(threadIndex);
                lck.lock();
                completedWorkloads_++;

                if (completedWorkloads_ == poolSize_)
                {
                    lck.unlock();
                    cvSignalComplete_.notify_one();
                }
                else
                    lck.unlock();
            }
            else
                lck.unlock();

        } while (true);
    }



    void TerminateThreads()
    {
        try
        {
            unique_lock<mutex> lck(mtx_);
            if (!terminate_) //Don't attempt to double-terminate
            {
                terminate_ = true;
                lck.unlock();
                cvSignalWork_.notify_all();

                for (int i = 0; i < threads_.size(); i++)
                    threads_[i].join();
            }
        }
        catch (std::exception e)
        {
            __debugbreak();
        }
    }
};
0 голосов
/ 06 февраля 2019

Я не уверен, если следующее поможет решить проблему, но я думаю, что ошибка, как показано ниже:

Этот

if (!cvSignalComplete_.wait_for(
            lck,
            timeout,
            [this] { return threadsActive_.load() == 0; })
            )

должен быть заменен на

if (!cvSignalComplete_.wait_for(
            lck,
            timeout,
            [&] { return threadsActive_.load() == 0; })
            )

Похоже, что лямбда не имеет доступа к экземпляру класса.Вот некоторая ссылка, чтобы поддержать мой случай.Посмотрите на раздел лямбда-захвата этой страницы .

Редактировать: Еще одно место, которое вы используете для ожидания с лямбдами.

cvSignalWork_.wait(
            lck,
            [this] {
            return
                threadsActive_.load() > 0 ||
                terminateFlag_.load();
        });

Возможно изменитьвсе лямбды, а затем посмотреть, работает ли это?

Причина, по которой я смотрю на лямбду, заключается в том, что это похоже на случай, похожий на ложное пробуждение.Надеюсь, это поможет.

...