Я работаю над пользовательской системой публикации / подписки для внутрипроцессного взаимодействия. Предположим, что издатели хотят отправить часть данных (сообщение) всем зарегистрированным подписчикам, и все эти объекты могут выполняться в другом потоке. Каждое соединение между издателем и подписчиком реализовано в виде очереди без блокировки shared_ptr<const Message>
.
Поскольку мне нужно избегать динамического выделения c, каждый издатель предварительно выделяет пул общих указателей на сообщение. Когда пользователь вызывает Publisher::publish(msg)
, я получаю сообщение из пула (если доступно), копирую предоставленное пользователем сообщение как *msgptr_from_pool = msg
, и pu sh такой общий указатель для всех очередей, подключающихся ко всем зарегистрированным подписчикам. Чтобы получить общий пул сообщений из пула, я проверяю его use_count
. Если он один, это означает, что ни один подписчик по-прежнему не использует (читает) сообщение, и я могу смело перерабатывать его. Обратите внимание, что я не удаляю сообщения из пула. Проще говоря, после того, как они были помещены в очереди, их счетчик использования будет> 1.
На стороне подписчика пользователь использует общие указатели сообщений из очереди, но никогда нигде не сохраняет использованный общий указатель. Пользователь обеспечивает обратный вызов вида void(const Message& msg)
и никогда не видит базовый общий указатель. Поэтому, как только все подписчики закончили с сообщением, его счетчик использования будет равен единице.
Мой вопрос: учитывая гарантии того, что стандартные мандаты C ++ относительно общих указателей в многопоточном контексте, безопасен ли этот шаблон? В частности, мне нужно иметь гарантию, что когда я увижу, что общий указатель использует счетчик = 1, я могу смело изменять указанное сообщение, потому что никакой другой поток больше не может его прочитать
Какой-то псевдокод для издателя:
template <typename Msg>
class Publisher
{
public:
typedef std::shared_ptr<Msg> MsgPtr;
// constructor preallocates the pool, etc..
bool publish(const Msg& m)
{
auto it = std::find_if(_pool.begin(), _pool.end(),
[](const MsgPtr& mptr){ return mptr.use_count() == 1; });
if(it == _pool.end()) return false;
*(*it) = m;
for(auto& q : _queues) q->push(*it);
return true;
}
private:
std::vector<MsgPtr> _pool; //
std::vector<LockfreeQueuePtr> _queues; // LockfreeQueuePtr can be a shared_ptr<boost::lockfree::spsc_queue> for instance
}
... и для подписчика
template <typename Msg>
class Subscriber
{
public:
typedef std::shared_ptr<const Msg> MsgConstPtr;
// constructor preallocates the pool, etc..
void consume()
{
_queue->consume_all([this](const MsgConstPtr& mptr){ _callback(*mptr); });
}
private:
std::function<void(const Msg&)> _callback;
std::vector<LockfreeQueuePtr> _queues; // LockfreeQueuePtr can be a shared_ptr<boost::lockfree::spsc_queue> for instance
}
Очереди распределяются между двумя классами (например, если есть только 1 паб на 1 подпрограмму два вектора очередей содержат только один общий элемент.