Как я могу создать наблюдаемую Rx, которая останавливает публикацию событий, когда последний наблюдатель отписывается? - PullRequest
6 голосов
/ 22 сентября 2011

Я создам наблюдаемое (с помощью различных средств) и верну его заинтересованным сторонам, но когда они закончат слушать, я хотел бы разрушить наблюдаемое, чтобы оно не продолжало потреблять ресурсы. Другой способ думать об этом как о создании тем в подсистеме паба. Когда никто больше не подписан на тему, вы больше не хотите хранить тему и ее фильтрацию.

Ответы [ 2 ]

10 голосов
/ 22 сентября 2011

Rx уже имеет оператора для удовлетворения ваших потребностей - ну, на самом деле, два - Publish & RefCount.

Вот как их использовать:

IObservable xs = ...

var rxs = xs.Publish().RefCount();

var sub1 = rxs.Subscribe(x => { });
var sub2 = rxs.Subscribe(x => { });

//later
sub1.Dispose();

//later 
sub2.Dispose();

//The underlying subscription to `xs` is now disposed of.

Простой.

1 голос
/ 22 сентября 2011

Если я понял ваш вопрос, вы хотите создать наблюдаемое таким образом, чтобы, когда все подписчики удалили свою подписку, то есть подписчика больше не было, вы захотите выполнить функцию очистки, которая остановит наблюдаемое от получения дополнительных значений. Если это то, что вы хотите, то вы можете сделать что-то вроде ниже:

//Wrap a disposable
public class WrapDisposable : IDisposable
    {
        IDisposable disp;
        Action act;
        public WrapDisposable(IDisposable _disp, Action _act)
        {
            disp = _disp;
            act = _act;
        }
        void IDisposable.Dispose()
        {
            act();
            disp.Dispose();
        }
    }

    //Observable that we want to clean up after all subs are done
    public static IObservable<long> GenerateObs(out Action cleanup)
    {
        cleanup = () =>
        {
            Console.WriteLine("All subscribers are done. Do clean up");
        };
        return Observable.Interval(TimeSpan.FromSeconds(1));
    }
    //Wrap the observable
    public static IObservable<T> WrapToClean<T>(IObservable<T> obs, Action onAllDone)
    {
        int count = 0;
        return Observable.CreateWithDisposable<T>(ob =>
        {
            var disp = obs.Subscribe(ob);
            Interlocked.Increment(ref count);
            return new WrapDisposable(disp,() =>
            {
                if (Interlocked.Decrement(ref count) == 0)
                {
                    onAllDone();                                                
                }
            });
        });
    }

// Пример использования:

Action cleanup;
var obs = GenerateObs(out cleanup);
var newObs = WrapToClean(obs, cleanup);
newObs.Take(6).Subscribe(Console.WriteLine);
newObs.Take(5).Subscribe(Console.WriteLine);
...