Какая забавная проблема! Если я понимаю, о чем вы спрашиваете, вот мое решение: создайте класс-оболочку для Observable, который отслеживает подписки, перехватывая как subscribe()
, так и unsubscribe()
. Вот класс оболочки:
export class CountSubsObservable<T> extends Observable<T>{
private _subCount = 0;
private _subCount$: BehaviorSubject<number> = new BehaviorSubject(0);
public subCount$ = this._subCount$.asObservable();
constructor(public source: Observable<T>) {
super();
}
subscribe(
observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void
): Subscription {
this._subCount++;
this._subCount$.next(this._subCount);
let subscription = super.subscribe(observerOrNext as any, error, complete);
const newUnsub: () => void = () => {
if (this._subCount > 0) {
this._subCount--;
this._subCount$.next(this._subCount);
subscription.unsubscribe();
}
}
subscription.unsubscribe = newUnsub;
return subscription;
}
}
Эта оболочка создает вторичную наблюдаемую .subCount$
, на которую можно подписаться, которая будет генерировать каждый раз, когда изменяется число подписок на исходную наблюдаемую. Он выдаст номер, соответствующий текущему количеству подписчиков.
Чтобы использовать его, вы должны создать наблюдаемый источник и затем вызвать new с этим классом для создания оболочки. Например:
const source$ = interval(1000).pipe(take(10));
const myEvent$: CountSubsObservable<number> = new CountSubsObservable(source$);
myEvent$.subCount$.subscribe(numSubs => {
console.log('subCount$ notification! Number of subscriptions is now', numSubs);
if(numSubs === 0) {
// release global resource
} else {
// allocate global resource, if not yet allocated
}
// for a scalable resource usage / load,
// re-configure it, based on numSubs
});
source$.subscribe(result => console.log('result is ', result));
Чтобы увидеть его в использовании, проверьте это Stackblitz .
UPDATE:
Хорошо, как уже упоминалось в комментариях, я немного пытаюсь понять, откуда поступает поток данных. Оглядываясь назад на ваш вопрос, я вижу, что вы предоставляете «интерфейс подписки на события». Если поток данных является потоком CustomType
, как вы подробно описали в своем третьем обновлении выше, то вы можете использовать fromEvent()
из rxjs
для создания наблюдаемого источника, с помощью которого вы бы вызвали предоставленный мною класс-оболочку.
Чтобы показать это, я создал новый Stackblitz . Из этого Stackblitz вот поток CustomType
s и как я бы использовал класс CountedObservable для достижения того, что вы ищете.
class CustomType {
a: string;
}
const dataArray = [
{ a: 'January' },
{ a: 'February' },
{ a: 'March' },
{ a: 'April' },
{ a: 'May' },
{ a: 'June' },
{ a: 'July' },
{ a: 'August' },
{ a: 'September' },
{ a: 'October' },
{ a: 'November' },
{ a: 'December' }
] as CustomType[];
// Set up an arbitrary source that sends a stream of `CustomTypes`, one
// every two seconds by using `interval` and mapping the numbers into
// the associated dataArray.
const source$ = interval(2000).pipe(
map(i => dataArray[i]), // transform the Observable stream into CustomTypes
take(dataArray.length), // limit the Observable to only emit # array elements
share() // turn into a hot Observable.
);
const myEvent$: CountedObservable<CustomType> = new CountedObservable(source$);
myEvent$.onCount.subscribe(newCount => {
console.log('newCount notification! Number of subscriptions is now', newCount);
});
Надеюсь, это поможет.