Безопасна ли эта реализация публикации / подписки? - PullRequest
0 голосов
/ 01 мая 2020

Я работаю над пользовательской системой публикации / подписки для внутрипроцессного взаимодействия. Предположим, что издатели хотят отправить часть данных (сообщение) всем зарегистрированным подписчикам, и все эти объекты могут выполняться в другом потоке. Каждое соединение между издателем и подписчиком реализовано в виде очереди без блокировки shared_ptr<const Message>.

Поскольку мне нужно избегать динамического выделения c, каждый издатель предварительно выделяет пул общих указателей на сообщение. Когда пользователь вызывает Publisher::publish(msg), я получаю сообщение из пула (если доступно), копирую предоставленное пользователем сообщение как *msgptr_from_pool = msg, и pu sh такой общий указатель для всех очередей, подключающихся ко всем зарегистрированным подписчикам. Чтобы получить общий пул сообщений из пула, я проверяю его use_count. Если он один, это означает, что ни один подписчик по-прежнему не использует (читает) сообщение, и я могу смело перерабатывать его. Обратите внимание, что я не удаляю сообщения из пула. Проще говоря, после того, как они были помещены в очереди, их счетчик использования будет> 1.

На стороне подписчика пользователь использует общие указатели сообщений из очереди, но никогда нигде не сохраняет использованный общий указатель. Пользователь обеспечивает обратный вызов вида void(const Message& msg) и никогда не видит базовый общий указатель. Поэтому, как только все подписчики закончили с сообщением, его счетчик использования будет равен единице.

Мой вопрос: учитывая гарантии того, что стандартные мандаты C ++ относительно общих указателей в многопоточном контексте, безопасен ли этот шаблон? В частности, мне нужно иметь гарантию, что когда я увижу, что общий указатель использует счетчик = 1, я могу смело изменять указанное сообщение, потому что никакой другой поток больше не может его прочитать

Какой-то псевдокод для издателя:

template <typename Msg>
class Publisher
{

public:

    typedef std::shared_ptr<Msg> MsgPtr;

    // constructor preallocates the pool, etc..

    bool publish(const Msg& m)
    {
        auto it = std::find_if(_pool.begin(), _pool.end(),
                               [](const MsgPtr& mptr){ return mptr.use_count() == 1; });

        if(it == _pool.end()) return false;

        *(*it) = m;

        for(auto& q : _queues) q->push(*it);

        return true;
    }

private:

    std::vector<MsgPtr> _pool;  // 
    std::vector<LockfreeQueuePtr> _queues; // LockfreeQueuePtr can be a shared_ptr<boost::lockfree::spsc_queue> for instance

}

... и для подписчика

template <typename Msg>
class Subscriber
{

public:

    typedef std::shared_ptr<const Msg> MsgConstPtr;

    // constructor preallocates the pool, etc..

    void consume()
    {
        _queue->consume_all([this](const MsgConstPtr& mptr){ _callback(*mptr); }); 
    }

private:

    std::function<void(const Msg&)> _callback;

    std::vector<LockfreeQueuePtr> _queues; // LockfreeQueuePtr can be a shared_ptr<boost::lockfree::spsc_queue> for instance

}

Очереди распределяются между двумя классами (например, если есть только 1 паб на 1 подпрограмму два вектора очередей содержат только один общий элемент.

...