Если я понял ваш вопрос, вы хотите создать наблюдаемое таким образом, чтобы, когда все подписчики удалили свою подписку, то есть подписчика больше не было, вы захотите выполнить функцию очистки, которая остановит наблюдаемое от получения дополнительных значений.
Если это то, что вы хотите, то вы можете сделать что-то вроде ниже:
//Wrap a disposable
public class WrapDisposable : IDisposable
{
IDisposable disp;
Action act;
public WrapDisposable(IDisposable _disp, Action _act)
{
disp = _disp;
act = _act;
}
void IDisposable.Dispose()
{
act();
disp.Dispose();
}
}
//Observable that we want to clean up after all subs are done
public static IObservable<long> GenerateObs(out Action cleanup)
{
cleanup = () =>
{
Console.WriteLine("All subscribers are done. Do clean up");
};
return Observable.Interval(TimeSpan.FromSeconds(1));
}
//Wrap the observable
public static IObservable<T> WrapToClean<T>(IObservable<T> obs, Action onAllDone)
{
int count = 0;
return Observable.CreateWithDisposable<T>(ob =>
{
var disp = obs.Subscribe(ob);
Interlocked.Increment(ref count);
return new WrapDisposable(disp,() =>
{
if (Interlocked.Decrement(ref count) == 0)
{
onAllDone();
}
});
});
}
// Пример использования:
Action cleanup;
var obs = GenerateObs(out cleanup);
var newObs = WrapToClean(obs, cleanup);
newObs.Take(6).Subscribe(Console.WriteLine);
newObs.Take(5).Subscribe(Console.WriteLine);