Если да, не совсем уверен, почему / как?
Давайте посмотрим, как определяется share()
:
function shareSubjectFactory() {
return new Subject<any>();
}
return (source: Observable<T>) => refCount()(multicast(shareSubjectFactory)(source)) as Observable<T>;
Прежде всего,
(source: Observable<T>) => refCount()(multicast(shareSubjectFactory)(source))
то же самое, что
(source: Observable<T>) => source.pipe(
multicast(shareSubjectFactory),
refCount()
)
multicast
вернет ConnectableObservable
, которое по-прежнему является Observable
, но, среди прочего, он предоставляет метод connect
.
// Inside `multicast` operator
const connectable: any = Object.create(source, connectableObservableDescriptor);
connectable.source = source;
connectable.subjectFactory = subjectFactory;
return <ConnectableObservable<R>> connectable;
Источник
Еще одна интересная особенность - это то, что когда подписан на , подписчик будет добавлен в список подписчиков Subject
, и основной источник не будет подписан на , пока не будет вызван connect
:
_subscribe(subscriber: Subscriber<T>) {
return this.getSubject().subscribe(subscriber);
}
protected getSubject(): Subject<T> {
const subject = this._subject;
if (!subject || subject.isStopped) {
this._subject = this.subjectFactory();
}
return this._subject!;
}
Например:
const src$ = privateSrc.pipe(
tap(() => console.log('from src')),
share(),
tap(() => console.log('from share()')),
)
Когда src$
подписан:
// Subscriber #1
src$.subscribe(/* ... */)
, подписчик будет добавлен в список подписчиков Subject
и источник src$
, будет подписан. Зачем? Поскольку share
также использует refCount
, который подписывает на источник, если новый подписчик регистрируется, когда не было предыдущих активных подписчиков, и отписывает от источника, если нет больше активных подписчиков.
Давайте посмотрим на другой пример:
const src$ = (new Observable(s => {
console.warn('[SOURCE] SUBSCRIBED')
setTimeout(() => {
s.next(1);
}, 1000);
})).pipe(share());
// First subscriber,
// because it's the first one, `refCount` will to its job and the source will be subscribed
// and this subscriber will be added to the `Subject`'s subscribers list
// note that the source sends the data asynchronously
src$.subscribe(/* ... */)
// The second subscriber
// since the source is already subscribed, `refCount` won't subscribe to it again
// instead, this new subscriber will be added to `Subject`'s list
src$.subscribe(/* ... */)
После 1s
источник отправит значение 1
, а субъект получит это значение и будет отправить его своим зарегистрированным подписчикам.
Это как refCount
делает свое волшебство c:
// When a new subscriber is registered
(<any> connectable)._refCount++;
// `RefCountSubscriber` will make sure that if no more subscribers are left
// the source will be unsubscribed
const refCounter = new RefCountSubscriber(subscriber, connectable);
// Add the subscriber to the `Subject`'s list
const subscription = connectable.subscribe(refCounter);
if (!refCounter.closed) {
(<any> refCounter).connection = connectable.connect();
}
return subscription;
И ConnectableObservable.connect
определяется следующим образом :
connect(): Subscription {
let connection = this._connection;
if (!connection) {
// If the source wasn't subscribed before
this._isComplete = false;
connection = this._connection = new Subscription();
// Subscribing to the source
// Every notification send by the source will be first received by `Subject`
connection.add(this.source
.subscribe(new ConnectableSubscriber(this.getSubject(), this)));
/* ... */
}
return connection;
}
Итак, если у нас есть src$
наблюдаемый объект, на который необходимо подписаться несколько раз в шаблоне, мы можем применить вышеупомянутые концепции.
Однако есть важный аспект, о котором мы должны знать из.
Если наш шаблон выглядит так:
<!-- #1 -->
<div *ngIf="src$ | async"></div>
<!-- ... -->
<!-- #2 -->
<div *ngIf="src$ | async"></div>
и src$
:
src$ = store.pipe(select(/* ... */), share())
, то, если store
уже имеет значение, он будет извлечено синхронно , что означает, что когда #1
будет зарегистрирован, store
будет подписан и отправит это значение, но обратите внимание, что в это время #2
равно еще не подписан, поэтому он ничего не получит.
Если source
асинхронный, то у нас не должно возникнуть проблем, поскольку подписки в шаблоне, скорее всего, будут синхронными .
Но, когда источник синхронный , вы можете решить эту проблему следующим образом:
src$ = store.pipe(
select(/* ... */),
subscribeOn(asyncScheduler),
share()
)
subscribeOn(asyncScheduler)
это примерно то же, что и задержка подписки на источник с setTimeout(() => {}, 0)
. Но это позволяет подписаться на #2
, так что, когда источник, наконец, подписан, оба подписчика получат это значение.