Я решил, что мне нужно написать свой собственный планировщик. Однако фактический объем работ был минимальным. Планировщик boost::fibers::shared_work
управляет списком волокон, которые совместно используются потоками, используя одну статическую очередь, защищенную статическим мьютексом. Существует еще одна очередь, которая управляет основным волокном для каждого потока (поскольку каждый поток имеет свой собственный планировщик), но это локально для экземпляра класса, а не совместно используется всеми экземплярами класса, как статические члены.
Тогда, чтобы предотвратить совместное использование статической очереди и блокировки между отдельными наборами потоков, нужно поместить, в основном бесполезный, параметр шаблона перед классом. Затем каждый поток передает свой параметр в этот шаблон. Таким образом, поскольку вы получаете разные объекты для каждой специализации шаблона, вы получаете различный набор статических переменных для каждого экземпляра с другим номером пула.
Ниже приведена моя реализация этого решения (в основном это копия boost::fiber::shared_work
с несколькими более четкими именами переменных и типов и добавленным параметром шаблона).
#include <condition_variable>
#include <chrono>
#include <deque>
#include <mutex>
#include <boost/config.hpp>
#include <boost/fiber/algo/algorithm.hpp>
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/scheduler.hpp>
#include <boost/assert.hpp>
#include "boost/fiber/type.hpp"
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
#ifdef _MSC_VER
# pragma warning(push)
# pragma warning(disable:4251)
#endif
/*!
* @class SharedWorkPool
* @brief A scheduler for boost::fibers that operates in a manner similar to the
* shared work scheduler, except that it takes a template parameter determining
* which pool to draw fibers from. In this fashion, one group of threads can share
* a pool of fibers among themselves while another group of threads can work with
* a completely separate pool
* @tparam PoolNumber The index of the pool number for this thread
*/
template <int PoolNumber>
class SharedWorkPool : public boost::fibers::algo::algorithm
{
typedef std::deque<boost::fibers::context * > ReadyQueueType;
typedef boost::fibers::scheduler::ready_queue_type LocalQueueType;
typedef std::unique_lock<std::mutex> LockType;
public:
SharedWorkPool() = default;
~SharedWorkPool() override {}
SharedWorkPool( bool suspend) : suspendable{suspend}{}
SharedWorkPool( SharedWorkPool const&) = delete;
SharedWorkPool( SharedWorkPool &&) = delete;
SharedWorkPool& operator=(const SharedWorkPool&) = delete;
SharedWorkPool& operator=(SharedWorkPool&&) = delete;
void awakened(boost::fibers::context* ctx) noexcept override;
boost::fibers::context* pick_next() noexcept override;
bool has_ready_fibers() const noexcept override
{
LockType lock{readyQueueMutex};
return ((!readyQueue.empty()) || (!localQueue.empty()));
}
void suspend_until(const std::chrono::steady_clock::time_point& timePoint) noexcept override;
void notify() noexcept override;
private:
static ReadyQueueType readyQueue;
static std::mutex readyQueueMutex;
LocalQueueType localQueue{};
std::mutex instanceMutex{};
std::condition_variable suspendCondition{};
bool waitNotifyFlag{false};
bool suspendable{false};
};
template <int PoolNumber>
void SharedWorkPool<PoolNumber>::awakened(boost::fibers::context* ctx) noexcept
{
if(ctx->is_context(boost::fibers::type::pinned_context))
{ // we have been passed the thread's main fiber, never put those in the shared queue
localQueue.push_back(*ctx);
}
else
{//worker fiber, enqueue on shared queue
ctx->detach();
LockType lock{readyQueueMutex};
readyQueue.push_back(ctx);
}
}
template <int PoolNumber>
boost::fibers::context* SharedWorkPool<PoolNumber>::pick_next() noexcept
{
boost::fibers::context * ctx = nullptr;
LockType lock{readyQueueMutex};
if(!readyQueue.empty())
{ //pop an item from the ready queue
ctx = readyQueue.front();
readyQueue.pop_front();
lock.unlock();
BOOST_ASSERT( ctx != nullptr);
boost::fibers::context::active()->attach( ctx); //attach context to current scheduler via the active fiber of this thread
}
else
{
lock.unlock();
if(!localQueue.empty())
{ //nothing in the ready queue, return main or dispatcher fiber
ctx = & localQueue.front();
localQueue.pop_front();
}
}
return ctx;
}
template <int PoolNumber>
void SharedWorkPool<PoolNumber>::suspend_until(const std::chrono::steady_clock::time_point& timePoint) noexcept
{
if(suspendable)
{
if (std::chrono::steady_clock::time_point::max() == timePoint)
{
LockType lock{instanceMutex};
suspendCondition.wait(lock, [this](){return waitNotifyFlag;});
waitNotifyFlag = false;
}
else
{
LockType lock{instanceMutex};
suspendCondition.wait_until(lock, timePoint, [this](){return waitNotifyFlag;});
waitNotifyFlag = false;
}
}
}
template <int PoolNumber>
void SharedWorkPool<PoolNumber>::notify() noexcept
{
if(suspendable)
{
LockType lock{instanceMutex};
waitNotifyFlag = true;
lock.unlock();
suspendCondition.notify_all();
}
}
template <int PoolNumber>
std::deque<boost::fibers::context*> SharedWorkPool<PoolNumber>::readyQueue{};
template <int PoolNumber>
std::mutex SharedWorkPool<PoolNumber>::readyQueueMutex{};
Обратите внимание, я не совсем уверен, что произойдет, если вы попытаетесь использовать один и тот же номер пула из объявлений в разных единицах компиляции. Однако при нормальных обстоятельствах, т. Е. Вы записали boost::fibers::use_scheduling_algorithm< Threads::Fibers::SharedWorkPool<WorkPoolNumber> >();
только в одном месте для каждого WorkPoolNumber
, это работает отлично. Волокна, назначенные данному набору потоков, всегда работают в одном и том же наборе потоков и никогда не запускаются другим набором потоков.