«Промежуточные IObservables» без конечных подписчиков сохраняются в памяти на время жизни корневого IObservable - PullRequest
5 голосов
/ 16 марта 2012

Например, рассмотрим это:

    public IDisposable Subscribe<T>(IObserver<T> observer)
    {
        return eventStream.Where(e => e is T).Cast<T>().Subscribe(observer);
    }

eventStream является долгоживущим источником событий. Короткоживущий клиент будет использовать этот метод для подписки на определенный период времени, а затем отписаться, позвонив Dispose на возвращенный IDisposable.

Однако, хотя eventStream все еще существует и должен храниться в памяти, с помощью этого метода было создано 2 новых IObservables - возвращенного методом Where(), который предположительно хранится в памяти eventStream, и тот, который возвращен методом Cast<T>(), который предположительно хранится в памяти тем, который возвращен методом Where().

Как эти «промежуточные IObservables» (как их лучше назвать?) Будут очищены? Или же они будут существовать в течение всего срока жизни eventStream, даже если у них больше нет подписок, и никто больше не ссылается на них, кроме как на их источник IObservable, и поэтому никогда больше не будет подписок?

Если они убираются, сообщая родителям, что у них больше нет подписок, как они узнают, что ничто другое не ссылается на них и может в какой-то момент подписаться на них?

Ответы [ 5 ]

2 голосов
/ 16 марта 2012

Однако, хотя eventStream все еще существует и должен храниться в памяти, с помощью этого метода было создано 2 новых IObservable - одна, возвращаемая методом Where (), которая предположительно хранится в памяти с помощью eventStream, и одна возвращается методом Cast (), который предположительно хранится в памяти тем, который возвращается методом Where ().

У вас есть это назад. Давайте пройдемся по цепочке происходящего.

IObservable<T> eventStream; //you have this defined and assigned somewhere

public IDisposable Subscribe<T>(IObserver<T> observer)
{
    //let's break this method into multiple lines

    IObservable<T> whereObs = eventStream.Where(e => e is T);
    //whereObs now has a reference to eventStream (and thus will keep it alive), 
    //but eventStream knows nothing of whereObs (thus whereObs will not be kept alive by eventStream)
    IObservable<T> castObs = whereObs.Cast<T>();
    //as with whereObs, castObs has a reference to whereObs,
    //but no one has a reference to castObs
    IDisposable ret = castObs.Subscribe(observer);
    //here is where it gets tricky.
    return ret;
}

То, на что ret ссылается или не имеет ссылки, зависит от реализации различных наблюдаемых. Из того, что я видел в Reflector в библиотеке Rx и в операторах, которые я написал сам, большинство операторов не возвращают расходные материалы, имеющие ссылку на сам наблюдаемый оператор.

Например, базовая реализация Where будет выглядеть примерно так (набирается непосредственно в редакторе, без обработки ошибок)

IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> filter)
{
    return Observable.Create<T>(obs =>
      {
         return source.Subscribe(v => if (filter(v)) obs.OnNext(v),
                                 obs.OnError, obs.OnCompleted);
      }
}

Обратите внимание, что возвращаемое одноразовое использование будет иметь ссылку на функцию фильтра через созданного наблюдателя, но не будет иметь ссылку на наблюдаемое Where. Cast может быть легко реализовано с использованием того же шаблона. По сути, операторы становятся фабриками-наблюдателями.

Смысл всего этого для рассматриваемого вопроса заключается в том, что промежуточные IObservables могут собираться мусором к концу метода . Функция фильтра, переданная в Where, остается неизменной в течение всего срока действия подписки, но после удаления или завершения подписки остается только eventStream (при условии, что она еще жива).

РЕДАКТИРОВАТЬ для комментария суперкатера, давайте посмотрим, как компилятор мог бы переписать это или как бы вы реализовали это без замыканий.

class WhereObserver<T> : IObserver<T>
{
    WhereObserver<T>(IObserver<T> base, Func<T, bool> filter)
    {
        _base = base;
        _filter = filter;
    }

    IObserver<T> _base;
    Func<T, bool> _filter;

    void OnNext(T value)
    {
        if (filter(value)) _base.OnNext(value);
    }

    void OnError(Exception ex) { _base.OnError(ex); }
    void OnCompleted() { _base.OnCompleted(); }
}

class WhereObservable<T> : IObservable<T>
{
    WhereObservable<T>(IObservable<T> source, Func<T, bool> filter)
    {
        _source = source;
        _filter = filter;
    }

    IObservable<T> source;
    Func<T, bool> filter;

    IDisposable Subscribe(IObserver<T> observer)
    {
        return source.Subscribe(new WhereObserver<T>(observer, filter));
    }
}

static IObservable<T> Where(this IObservable<T> source, Func<T, bool> filter)
{
    return new WhereObservable(source, filter);
}

Вы можете видеть, что наблюдателю не нужна ссылка на наблюдаемую, которая его сгенерировала, и наблюдаемой не нужно отслеживать наблюдателей, которых он создает. Мы даже не сделали никаких новых идентификаторов для возврата из нашей подписки.

В действительности Rx имеет несколько реальных классов для анонимных наблюдаемых / наблюдателей, которые принимают делегатов и перенаправляют вызовы интерфейса этим делегатам. Он использует замыкания для создания этих делегатов. Компилятору не нужно выдавать классы, которые фактически реализуют интерфейсы, но дух перевода остается прежним.

1 голос
/ 18 марта 2012

Вы должны помнить, что IObserable<T> (например, IEnumerable<T>) - это ленивые списки.Они не существуют, пока кто-то не попытается получить доступ к элементам путем подписки или итерации.

Когда вы пишете list.Where(x => x > 0), вы не создаете новый список, вы просто определяете, как будет выглядеть новый список, если кто-топытается получить доступ к элементам.

Это очень важное различие.

Вы можете считать, что есть два разных IObservables.Одним из них является определение и подписанные экземпляры.

Определения IObservable используют рядом с отсутствием памяти.Ссылки могут быть свободно распространены.Они будут полностью очищены от мусора.

Подписанные экземпляры существуют, только если кто-то подписан.Они могут использовать значительную память.Если вы не используете расширения .Publish, вы не можете делиться ссылками.Когда подписка заканчивается или завершается с помощью вызова .Dispose(), память очищается.

Для каждой новой подписки создается новый набор подписанных экземпляров.Когда окончательная дочерняя подписка удаляется, вся цепочка удаляется.Они не могут быть разделены.Если есть вторая подписка, создается полная цепочка подписанных экземпляров, независимо от первой.

Надеюсь, это поможет.

1 голос
/ 17 марта 2012

Я думаю, что пришел к выводу с помощью ответа Гедеона и разбил образец Where метод:

Я неправильно предположил, что на каждый нисходящий поток IObservable всегда ссылалась восходящий поток (для того, чтобы проталкивать события вниз, когда это необходимо). Но это укоренило бы нисходящие потоки в памяти на время существования восходящего потока.

Фактически, на каждый восходящий поток IObservable ссылается нисходящий поток IObservable (ожидание, готовое к подключению IObserver, когда потребуется). Это укореняет восходящие потоки в памяти до тех пор, пока на них ссылаются нисходящие (что имеет смысл, поскольку в то время как нисходящий поток еще где-то ссылается, подписка может происходить в любое время).

Однако, когда подписка действительно происходит, эта ссылочная цепочка между восходящими и нисходящими потоками формируется, но только на объектах реализации IDisposable, которые управляют подписками на каждой наблюдаемой стадии, и только в течение срока действия этой подписки. (что также имеет смысл - хотя подписка существует, каждая восходящая «логика обработки» должна все еще храниться в памяти, чтобы обрабатывать события, проходящие через нее, чтобы достичь конечного подписчика IObserver).

Это дает решение обеих проблем - при ссылке на IObservable он будет хранить весь источник (восходящий поток) IObservables в памяти, готовый для подписки. И хотя подписка существует, она будет хранить все последующие подписки в памяти, позволяя окончательной подписке по-прежнему получать события, даже если на ее источник IObservable больше нельзя ссылаться.

Применяя это к моему примеру в моем вопросе, наблюдаемые нисходящие потоки Where и Cast очень недолговечны - на них ссылаются вплоть до завершения вызова Subscribe(observer). Они тогда свободны быть собранными. Тот факт, что промежуточные наблюдаемые теперь могут быть собраны, не вызывает проблем для только что созданной подписки, поскольку она сформировала свою собственную цепочку объектов подписки (вверх по течению -> вниз по течению), которая укоренена наблюдаемым источником eventStream. Эта цепочка будет выпущена, как только каждый последующий этап выберет свой IDisposable трекер подписки.

0 голосов
/ 16 марта 2012

Если объект подписывается на события, будь то для собственного использования или с целью пересылки их другим объектам, издатель этих событий, как правило, поддерживает его, даже если никто другой этого не сделает. Если я правильно понимаю вашу ситуацию, у вас есть объекты, которые подписываются на события с целью пересылки их нулю или большему количеству других подписчиков. Я бы посоветовал вам по возможности разработать промежуточные IObservables, чтобы они не подписывались на событие от своего родителя до тех пор, пока кто-то не подпишется на событие от них, и они откажутся от подписки на событие своего родителя в любое время, когда последний подписчик откажется от подписки. Будет ли это практичным, будет зависеть от потоковых контекстов родительского и дочернего IObservables. Также отметим, что (опять же, в зависимости от контекста потоков) блокировка может потребоваться в случае, когда новый подписчик присоединяется примерно в то же время, что (что было бы), когда последний подписчик выходит. Даже при том, что большинство сценариев подписки и отписки большинства объектов могут обрабатываться с использованием CompareExchange, а не блокировок, это часто невозможно в сценариях, включающих взаимосвязанные списки подписок.

Если ваш объект будет получать подписки и отписки от своих дочерних элементов в контексте потоков, который не совместим с родительской подпиской и методами отмены подписки (IMHO, IObservable должен был требовать, чтобы все допустимые реализации разрешали подписку и отмену подписки из произвольного контекста потоков (но, увы, нет) у вас может не быть иного выбора, кроме как иметь промежуточную IObservable сразу после создания, создать прокси-объект для обработки подписок от вашего имени и заставить этот объект подписаться на событие родителя. Затем добавьте свой собственный объект (на который у прокси будет только слабая ссылка) и добавьте финализатор, который уведомит прокси о том, что ему нужно будет отписаться, когда это разрешит контекст потоков своего родителя. Было бы неплохо, чтобы ваш прокси-объект отказывался от подписки, когда его последний подписчик завершает свою работу, но если новый подписчик может присоединиться и ожидать, что его подписка вступит в силу немедленно, возможно, придется сохранять подписку прокси-сервера до тех пор, пока кто-либо удерживает ссылку на промежуточный наблюдатель, который может быть использован для запроса новой подписки.

0 голосов
/ 16 марта 2012

Класс, реализующий IObservable, является просто обычным объектом.Он будет очищен при запуске GC и не увидит никаких ссылок на него.Это не что иное, как "когда очищается new object()".За исключением использования памяти, ваша программа не должна видеть, очищены ли они.

...