В качестве учебного упражнения я реализую пул потоков, используя условные переменные.Поток контроллера создает пул потоков, ожидающих сигнала (для атомарной переменной устанавливается значение выше нуля).Когда потоки сигнализируют о пробуждении, выполняют свою работу, а когда последний поток завершается, он сигнализирует о пробуждении основного потока.Поток контроллера блокируется до завершения последнего потока.Затем пул становится доступным для последующего повторного использования.
Время от времени я получал тайм-аут в потоке контроллера, ожидающий, чтобы рабочий сообщил о завершении (вероятно, из-за состояния гонки при уменьшении активного счетчика работы), поэтому в попытке упрочить пул я заменил «wait (lck)» метода wait условной переменной на «wait (lck, Предикат)».Поскольку при этом поведение пула потоков таково, что, по-видимому, оно позволяет уменьшить активный счетчик работы ниже 0 (что является условием для повторного пробуждения потока контроллера) - у меня есть условие гонки.Я прочитал бесчисленные статьи по атомарным переменным, синхронизации, упорядочению памяти, ложным и потерянным пробуждениям по stackoverflow и различным другим сайтам, включил в себя то, что я узнал в меру своих возможностей, и до сих пор не могу на всю жизнь работатьпочему способ, которым я кодировал ожидаемое ожидание, просто не работает.Счетчик должен всегда быть таким же высоким, как количество потоков в пуле (скажем, 8) и равным нулю.Я начал терять веру в себя - просто не должно быть так сложно сделать что-то принципиально простое.Ясно, что здесь мне нужно изучить кое-что еще:)
Учитывая, конечно, что было состояние гонки, я гарантировал, что две переменные, которые управляют пробуждением и завершением пула, являются атомарными, и что обаизменяется только когда защищено с помощью unique_lock.В частности, я позаботился о том, чтобы при запуске запроса к пулу была получена блокировка, счетчик активных потоков был изменен с 0 на 8, разблокирован мьютекс, а затем «messages_all».Поток контроллера будет только тогда пробужден со счетчиком активных потоков, равным нулю, как только последний рабочий поток уменьшит его до этого уровня и уведомит "one_one ".
В рабочем потоке переменная условия будет ожидать и активироваться только тогда, когдачисло активных потоков больше нуля, разблокируйте мьютекс, параллельно продолжайте выполнять работу, предварительно назначенную процессору при создании пула, повторно получить мьютекс и атомарно уменьшить количество активных потоков.Затем он, хотя и предположительно защищенный блокировкой, проверит, был ли последний поток еще активным, и, если это так, снова разблокирует мьютекс и «notify_one», чтобы разбудить контроллер.
Проблема в том, чтоСчетчик активного потока многократно сбрасывается ниже нуля даже после 1 или 2 итераций.Если я протестирую счетчик активных потоков в начале новой рабочей нагрузки, я смогу найти счетчик активных потоков около -6 - это как если бы пулу было разрешено повторно пробуждать поток контроллера до завершения работы.
Учитывая, что счетчик потока и флаг завершения оба являются атомарными переменными и только когда-либо изменяются, в то время как под защитой того же мьютекса я использую последовательное упорядочение памяти для всех обновлений, я просто не могу видеть, как это происходит, и япотерян.
#include <stdafx.h>
#include <Windows.h>
#include <iostream>
#include <thread>
using std::thread;
#include <mutex>
using std::mutex;
using std::unique_lock;
#include <condition_variable>
using std::condition_variable;
#include <atomic>
using std::atomic;
#include <chrono>
#include <vector>
using std::vector;
class IWorkerThreadProcessor
{
public:
virtual void Process(int) = 0;
};
class MyProcessor : public IWorkerThreadProcessor
{
int index_ = 0;
public:
MyProcessor(int index)
{
index_ = index;
}
void Process(int threadindex)
{
for (int i = 0; i < 5000000; i++);
std::cout << '(' << index_ << ':' << threadindex << ") ";
}
};
#define MsgBox(x) do{ MessageBox(NULL, x, L"", MB_OK ); }while(false)
class ThreadPool
{
private:
atomic<unsigned int> invokations_ = 0;
//This goes negative when using the wait_for with predicate
atomic<int> threadsActive_ = 0;
atomic<bool> terminateFlag_ = false;
vector<std::thread> threads_;
atomic<unsigned int> poolSize_ = 0;
mutex mtxWorker_;
condition_variable cvSignalWork_;
condition_variable cvSignalComplete_;
public:
~ThreadPool()
{
TerminateThreads();
}
void Init(std::vector<IWorkerThreadProcessor*>& processors)
{
unique_lock<mutex> lck2(mtxWorker_);
threadsActive_ = 0;
terminateFlag_ = false;
poolSize_ = processors.size();
for (int i = 0; i < poolSize_; ++i)
threads_.push_back(thread(&ThreadPool::launchMethod, this, processors[i], i));
}
void ProcessWorkload(std::chrono::milliseconds timeout)
{
//Only used to see how many invocations I was getting through before experiencing the issue - sadly it's only one or two
invocations_++;
try
{
unique_lock<mutex> lck(mtxWorker_);
//!!!!!! If I use the predicated wait this break will fire !!!!!!
if (threadsActive_.load() != 0)
__debugbreak();
threadsActive_.store(poolSize_);
lck.unlock();
cvSignalWork_.notify_all();
lck.lock();
if (!cvSignalComplete_.wait_for(
lck,
timeout,
[this] { return threadsActive_.load() == 0; })
)
{
//As you can tell this has taken me through a journey trying to characterise the issue...
if (threadsActive_ > 0)
MsgBox(L"Thread pool timed out with still active threads");
else if (threadsActive_ == 0)
MsgBox(L"Thread pool timed out with zero active threads");
else
MsgBox(L"Thread pool timed out with negative active threads");
}
}
catch (std::exception e)
{
__debugbreak();
}
}
void launchMethod(IWorkerThreadProcessor* processor, int threadIndex)
{
do
{
unique_lock<mutex> lck(mtxWorker_);
//!!!!!! If I use this predicated wait I see the failure !!!!!!
cvSignalWork_.wait(
lck,
[this] {
return
threadsActive_.load() > 0 ||
terminateFlag_.load();
});
//!!!!!!!! Does not cause the failure but obviously will not handle
//spurious wake-ups !!!!!!!!!!
//cvSignalWork_.wait(lck);
if (terminateFlag_.load())
return;
//Unlock to parallelise the work load
lck.unlock();
processor->Process(threadIndex);
//Re-lock to decrement the work count
lck.lock();
//This returns the value before the subtraction so theoretically if the previous value was 1 then we're the last thread going and we can now signal the controller thread to wake. This is the only place that the decrement happens so I don't know how it could possibly go negative
if (threadsActive_.fetch_sub(1, std::memory_order_seq_cst) == 1)
{
lck.unlock();
cvSignalComplete_.notify_one();
}
else
lck.unlock();
} while (true);
}
void TerminateThreads()
{
try
{
unique_lock<mutex> lck(mtxWorker_);
if (!terminateFlag_)
{
terminateFlag_ = true;
lck.unlock();
cvSignalWork_.notify_all();
for (int i = 0; i < threads_.size(); i++)
threads_[i].join();
}
}
catch (std::exception e)
{
__debugbreak();
}
}
};
int main()
{
std::vector<IWorkerThreadProcessor*> processors;
for (int i = 0; i < 8; i++)
processors.push_back(new MyProcessor(i));
std::cout << "Instantiating thread pool\n";
auto pool = new ThreadPool;
std::cout << "Initialisting thread pool\n";
pool->Init(processors);
std::cout << "Thread pool initialised\n";
for (int i = 0; i < 200; i++)
{
std::cout << "Workload " << i << "\n";
pool->ProcessWorkload(std::chrono::milliseconds(500));
std::cout << "Workload " << i << " complete." << "\n";
}
for (auto a : processors)
delete a;
delete pool;
return 0;
}