Как мне разорвать отношения наблюдателя в многопоточном C ++? - PullRequest
11 голосов
/ 10 февраля 2009

У меня есть субъект, который предлагает Subscribe(Observer*) и Unsubscribe(Observer*) клиентам. Субъект работает в своем собственном потоке (из которого он вызывает Notify() для подписанных наблюдателей), а мьютекс защищает свой внутренний список наблюдателей.

Я бы хотел, чтобы клиентский код, который я не контролирую, мог безопасно удалить Observer после того, как он отписался. Как этого достичь?

  • Удержание мьютекса - даже рекурсивный мьютекс - пока я уведомляю наблюдателей не вариант из-за риск тупиковой ситуации.
  • Я мог бы отметить наблюдателя для удаления в Unsubscribe call и удалите его из Темы темы. затем клиенты могут ждать специального Уведомление «Безопасно удалить». это выглядит безопасно, но обременительно для клиентов.

Редактировать

Ниже приведен иллюстративный код. Проблема заключается в том, как предотвратить отмену подписки, когда в разделе «Проблема здесь» находится пункт «Выполнить». Тогда я мог бы перезвонить на удаленный объект. В качестве альтернативы, если я удерживаю мьютекс, а не делаю копию, я могу заблокировать некоторые клиенты.

#include <set>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

using namespace std;
using namespace boost;

class Observer
{
public:
    void Notify() {}
};

class Subject
{
public:
    Subject() : t(bind(&Subject::Run, this))
    {
    }

    void Subscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        observers.insert(o);
    }

    void Unsubscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        observers.erase(o);
    }

    void Run()
    {
        for (;;)
        {
            WaitForSomethingInterestingToHappen();
            set<Observer*> notifyList;
            {
                mutex::scoped_lock l(m);
                notifyList = observers;
            }
            // Problem here
            for_each(notifyList.begin(), notifyList.end(), 
                     mem_fun(&Observer::Notify));
        }
    }

private:
    set<Observer*> observers;
    thread t;
    mutex m;
};

Редактировать

Я не могу уведомить наблюдателей, удерживая мьютекс из-за риска тупика. Наиболее очевидный способ, которым это может произойти - клиент вызывает Subscribe или Unsubscribe изнутри Notify - легко исправить, сделав мьютекс рекурсивным. Более коварным является риск прерывистой блокировки в разных потоках.

Я нахожусь в многопоточной среде, поэтому в любой момент выполнения потока он обычно содержит последовательность блокировок L1, L2, ... Ln. Другая нить будет держать замки K1, K2, ... Km. Правильно написанный клиент гарантирует, что разные потоки всегда будут получать блокировки в одном и том же порядке. Но когда клиенты взаимодействуют с мьютексом моего субъекта - назовите его X - эта стратегия будет нарушена: вызовы подписки / отмены подписки получают блокировки в порядке L1, L2, ... Ln, X. вызовы Notify из моего потока темы получают блокировки порядок X, K1, K2, ... Km. Если какой-либо из Li или Kj может совпасть по какому-либо пути вызова, клиент подвергается периодической тупиковой ситуации с небольшой вероятностью отладки. Поскольку я не контролирую код клиента, я не могу этого сделать.

Ответы [ 9 ]

7 голосов
/ 10 февраля 2009

Unsubscribe () должен быть синхронным, чтобы он не возвращался, пока не будет гарантировано, что Observer больше не будет в списке субъекта. Это единственный способ сделать это безопасно.

ETA (перевод моего комментария к ответу):

Поскольку время не кажется проблемой, возьмите и отпустите мьютекс между уведомлениями каждого наблюдателя. Вы не сможете использовать for_each таким, какой вы есть сейчас, и вам придется проверить итератор, чтобы убедиться, что он все еще действителен.

for ( ... )
{
    take mutex
    check iterator validity
    notify
    release mutex
}

Это будет делать то, что вы хотите.

3 голосов
/ 10 февраля 2009

Можете ли вы изменить подпись Subscribe () и Unsubscribe ()? Замена Observer * чем-то вроде shared_ptr облегчит задачу.

РЕДАКТИРОВАТЬ: Заменить «легкий» на «проще» выше. Для примера того, как это трудно «получить правильно», см. Историю Boost.Signals и принятого , но еще не в распределении Boost.Signals2 (ранее Boost.ThreadSafeSignals) библиотеки.

1 голос
/ 13 апреля 2015

«Идеальное» решение предполагает использование shared_ptr и weak_ptr. Однако для того, чтобы быть универсальным, он также должен учитывать проблему потери Subject до того, как некоторые из его Observer (да, это тоже может случиться).

class Subject {
public:
    void Subscribe(std::weak_ptr<Observer> o);
    void Unsubscribe(std::weak_ptr<Observer> o);

private:
    std::mutex mutex;
    std::set< std::weak_ptr<Observer> > observers;
};

class Observer: boost::noncopyable {
public:
    ~Observer();

    void Notify();

private:
    std::mutex;
    std::weak_ptr<Subject> subject;
};

С помощью этой структуры мы создаем циклический граф, но с разумным использованием weak_ptr, чтобы и Observer, и Subject могли быть уничтожены без координации.

Примечание. Для простоты я предположил, что Observer одновременно наблюдает один Subject, но он может легко наблюдать несколько объектов.


Теперь, похоже, вы застряли с небезопасным управлением памятью. Это довольно сложная ситуация, как вы можете себе представить. В этом случае я бы предложил эксперимент: асинхронный Unsubscribe. Или, по крайней мере, вызов Unsubscribe будет синхронным извне, но будет осуществляться асинхронно.

Идея проста: мы будем использовать очередь событий для достижения синхронизации. То есть:

  • вызов Unsubscribe отправляет событие в очередь (полезная нагрузка Observer*) и затем ждет
  • когда поток Subject обработал событие (я) Unsubscribe, он пробуждает ожидающий поток (ы)

Вы можете использовать либо ожидание занятости, либо переменную условия, я бы посоветовал переменную условия, если производительность не предписывает иное.

Примечание: это решение полностью не учитывает преждевременную смерть Subject.

1 голос
/ 10 февраля 2009

Вы можете создать «очередь на удаление» в типе CSubject. Когда вы удаляете Observer, вы можете вызвать pSubject-> QueueForDelete (pObserver). Затем, когда поток темы находится между уведомлениями, он может безопасно удалить наблюдателей из очереди.

1 голос
/ 10 февраля 2009

Вместо того чтобы клиенты получали уведомление «SafeToDelete», предоставьте им метод IsSubscribeed (Observer *). Код клиента становится:

subject.Unsubscribe( obsever );l
while( subject.IsSubscribed( observer ) ) {
   sleep_some_short_time;   // OS specific sleep stuff
}
delete observer;

, что не слишком обременительно.

1 голос
/ 10 февраля 2009

Будет ли что-то подобное удовлетворительным? Однако по-прежнему небезопасно отписываться от наблюдателя при получении уведомления, для этого вам потребуется интерфейс, подобный тому, который вы упомянули (насколько я могу судить).

Subscribe(Observer *x)
{
    mutex.lock();
    // add x to the list
    mutex.unlock();
}

Unsubscribe(Observer *x)
{
    mutex.lock();
    while (!ok_to_delete)
        cond.wait(mutex);
    // remove x from list
    mutex.unlock();
}

NotifyLoop()
{
    while (true) {
        // wait for something to trigger a notify

        mutex.lock();
        ok_to_delete = false;
        // build a list of observers to notify
        mutex.unlock();

        // notify all observers from the list saved earlier

        mutex.lock();
        ok_to_delete = true;
        cond.notify_all();
        mutex.unlock();
    }
}

Если вы хотите иметь возможность отменить подписку () внутри Notify () - (плохое проектное решение для клиента IMO ...) вы можете добавить идентификатор потока потока уведомителя в ваши данные состав. В функции отмены подписки вы можете проверить этот идентификатор потока по идентификатору текущего потока (большинство потоковых библиотек предоставляют это - например, pthread_self). Если они одинаковы, вы можете продолжить, не дожидаясь условной переменной.

ПРИМЕЧАНИЕ. Если клиент отвечает за удаление наблюдателя, это означает, что вы столкнулись с ситуацией, когда внутри обратного вызова Notify вы отмените подписку и удалите наблюдателя, но по-прежнему выполняете что-то с использованием этого указателя. Это то, о чем клиент должен знать, и удалять его можно только в конце уведомления ().

1 голос
/ 10 февраля 2009

Ммм ... Я не совсем понимаю ваш вопрос, потому что, если клиент вызывает Unsubscribe, вы можете позволить клиенту удалить его (он не используется вами). Однако, если по какой-то причине вы не можете закрыть отношения после того, как клиент отписался от наблюдателя, вы можете добавить «Субъект» новую операцию, чтобы безопасно удалить наблюдателя, или просто чтобы клиенты сигнализировали о том, что они больше не заинтересованы в наблюдателе. .

Переосмысление редактирования : Хорошо, теперь я думаю, что понимаю, в чем ваша проблема. Я думаю, что лучшее решение вашей проблемы заключается в следующем:

  1. У каждого сохраненного элемента наблюдателя должен быть флаг "valid". Этот флаг будет использоваться для уведомления об этом или нет, пока вы находитесь в цикле уведомлений.
  2. Вам нужен мьютекс для защиты доступа к этому «действительному» флагу. Затем операция отмены подписки блокирует мьютекс для флага «valid», устанавливает его в false для выбранного наблюдателя.
  3. Цикл уведомлений также должен блокировать и разблокировать мьютекс действительного флага и воздействовать только на «действительных» наблюдателей.

Учитывая, что операция отмены подписки будет блокировать мьютекс для сброса действующего флага (и что этот конкретный Observer больше не будет использоваться в вашем потоке), код является поточно-ориентированным, и клиенты могут удалить любого наблюдателя в ближайшее время. как отписался вернулся.

0 голосов
/ 11 апреля 2015

Измените observers на map с ключом Observer* и установите значение обертки Observer. Оболочка включает в себя логическое значение volatile, чтобы указать, действительно ли Observer. В методе subscribe объект-оболочка создается в состоянии valid . В методе unsubscribe оболочка помечается как invalid . Notify вызывается для оболочки вместо фактического наблюдателя . Оболочка вызовет Notify для фактического Наблюдателя , если он действителен (все еще подписан)

#include <map>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

using namespace std;
using namespace boost;

class Observer
{
public:
    void Notify() {}
};

class ObserverWrapper : public Observer
{
public:
    Observer* wrappee;
    volatile bool valid;
    ObserverWrapper(Observer* o) 
    {
        wrappee = o;
        valid = true;
    }

    void Notify() 
    {
        if (valid) wrappee->Notify();
    }
}
class Subject
{
public:
    Subject() : t(bind(&Subject::Run, this))
    {
    }

    void Subscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        boost::shared_ptr<ObserverWrapper> sptr(new ObserverWrapper(o));
        observers.insert(pair<Observer*, sptr));
    }

    void Unsubscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        observers.find(o)->second->valid = false;
        observers.erase(o);
    }

    void Run()
    {
        for (;;)
        {
            WaitForSomethingInterestingToHappen();
            vector<ObserverWrapper*> notifyList;
            {
                mutex::scoped_lock l(m);
                boost::copy(observers | boost::adaptors::map_values, std::back_inserter(notifyList));
            }
            // Should be no problem here
            for_each(notifyList.begin(), notifyList.end(), 
                     mem_fun(&ObserverWrapper::Notify));
        }
    }

private:
    map<Observer*, ObserverWrapper*> observers;
    thread t;
    mutex m;
};
0 голосов
/ 11 февраля 2009

Я думаю, что это помогает, если не очень элегантно:

class Subject {
public:
Subject() : t(bind(&Subject::Run, this)),m_key(0)    {    }
void Subscribe(Observer* o) {
    mutex::scoped_lock l(m);
    InternalObserver io( o );
    boost::shared_ptr<InternalObserver> sp(&io);
    observers.insert(pair<int,boost::shared_ptr<InternalObserver>> (MakeKey(o),sp));
}

void Unsubscribe(Observer* o) {
    mutex::scoped_lock l(m);
    observers.find( MakeKey(o) )->second->exists = false;    }

void WaitForSomethingInterestingToHappen() {}
void Run()
{
    for (;;)
    {
        WaitForSomethingInterestingToHappen();
        for( unsigned int i = 0; i < observers.size(); ++ i )
        {
            mutex::scoped_lock l(m);
            if( observers[i]->exists )
            {
                mem_fun(&Observer::Notify);//needs changing
            }
            else
            {
                observers.erase(i);
                --i;
            }
        }
    }
}
private:

int MakeKey(Observer* o) {
    return ++m_key;//needs changeing, sha of the object?
}
class InternalObserver {
public:
    InternalObserver(Observer* o) : m_o( o ), exists( true ) {}
    Observer* m_o;
    bool exists;
};

map< int, boost::shared_ptr<InternalObserver> > observers;
thread t;
mutex m;
int m_key;
};
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...