Вообще говоря, концепция количества абонентов для произвольной наблюдаемой последовательности не определена.
Для холодных наблюдаемых, таких как Observable.Interval
, каждый раз, когда вы подписываетесь на наблюдаемую, создается новый экземпляр конвейера, который - с его точки зрения, видит одновременно только одного наблюдателя.
Тем не менее, мы можем разогреть наблюдаемую холодную ситуацию, и приходят подписки на часы и go.
public static IObservable<T> RefCount<T>(this IObservable<T> source, Action<int> onChange)
{
var subscribers = 0;
var shared = source.Publish().RefCount();
void callback(int count) => onChange(Interlocked.Add(ref subscribers, count));
return Observable.Create<T>(observer =>
{
callback(+1);
var subscription = shared.Subscribe(observer);
var dispose = Disposable.Create(() => callback(-1));
return new CompositeDisposable(subscription, dispose);
});
}
Демо
var values =
Observable
.Interval(TimeSpan.FromSeconds(0.1))
.RefCount(count => Console.WriteLine($"Subscribers: {count}"));
values.Take(5).Subscribe();
values.Take(10).Subscribe();
values.Take(15).Subscribe();
Вывод
Subscribers: 1
Subscribers: 2
Subscribers: 3
Subscribers: 2
Subscribers: 1
Subscribers: 0
Теперь это работает, потому что у нас есть общее представление о родительской наблюдаемой. Поэтому постарайтесь, чтобы все подписки указывали на один и тот же экземпляр.
_articles = GetArticles().RefCount(count => Console.WriteLine($"Subscribers: {count}")));
...
_articles.Subscribe();