Вот ваш код, переведенный в Rx Js 6.5.5
, вместе с некоторыми другими небольшими изменениями:
let counter = 1;
const updateRequest = defer(() => mockDataFetch())
.pipe(
publishReplay(1, 1000),
refCount()
);
function mockDataFetch() {
console.log('RESUBSCRIBING');
return of(counter++)
.pipe(
// delay(0), // <-- delay by 0 milliseconds
);
}
function mockHttpCache() {
return updateRequest
.pipe(
take(1),
);
}
setTimeout(
() => mockHttpCache().subscribe(val => console.log("Response 50:", val), null, () => console.warn('complete[1]')
), 50);
setTimeout(
() => mockHttpCache().subscribe(val => console.log("Response 500:", val), null, () => console.warn('complete[2]')
), 500);
setTimeout(
() => mockHttpCache().subscribe(val => console.log("Response 1500:", val), null, () => console.warn('complete[3]')
), 1500);
StackBlitz .
Без delay(0)
Давайте сначала посмотрим, как реализован publishReplay
:
const selector = typeof selectorOrScheduler === 'function' ? selectorOrScheduler : undefined;
const subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
return (source: Observable<T>) => multicast(() => subject, selector!)(source) as ConnectableObservable<R>;
Как мы видим, он возвращает ConnectableObservable
, из-за на multicast
:
const connectable: any = Object.create(source, connectableObservableDescriptor);
connectable.source = source;
connectable.subjectFactory = subjectFactory;
return <ConnectableObservable<R>> connectable;
И вот как refCount
выглядит так:
// `connectable` - the `ConnectableObservable` from above
constructor(private connectable: ConnectableObservable<T>) { }
// `call` - called when the source is subscribed
// `source` - the `ConnectableObservable` from above
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
const { connectable } = this;
(<any> connectable)._refCount++;
const refCounter = new RefCountSubscriber(subscriber, connectable);
const subscription = source.subscribe(refCounter);
if (!refCounter.closed) {
(<any> refCounter).connection = connectable.connect();
}
return subscription;
}
Теперь давайте присмотритесь к ConnectableObservable
, особенно к методу subscribe
:
// Invoked as a result of `const subscription = source.subscribe(refCounter);` from `refCount`
_subscribe(subscriber: Subscriber<T>) {
return this.getSubject().subscribe(subscriber);
}
protected getSubject(): Subject<T> {
const subject = this._subject;
if (!subject || subject.isStopped) {
this._subject = this.subjectFactory();
}
return this._subject!;
}
Где subjectFactory
возвращает экземпляр ReplaySubject
. Что в основном происходит на const subscription = source.subscribe(refCounter);
, так это то, что RefCounterSubscriber
будет добавлен в список активных подписчиков ReplaySubject
. A RefCounterSubscriber
отслеживает количество подписчиков, и когда их больше нет, он автоматически подписывается на источник при регистрации нового подписчика (при использовании того же экземпляра ReplaySubject
).
Далее , Будет вызвано (<any> refCounter).connection = connectable.connect();
.
connectable.connect()
выполняет следующее:
connect(): Subscription {
let connection = this._connection;
if (!connection) {
this._isComplete = false;
connection = this._connection = new Subscription();
connection.add(this.source
.subscribe(new ConnectableSubscriber(this.getSubject(), this)));
if (connection.closed) {
this._connection = null;
connection = Subscription.EMPTY;
}
}
return connection;
}
При достижении этих строк:
connection.add(this.source
.subscribe(new ConnectableSubscriber(this.getSubject(), this)));
источник (например, mockDataFetch()
) будет фактически подписан.
Теперь of(counter)
реализован следующим образом (примерно):
// In this case, `arr = [counter]`
new Observable(subscriber => {
for (let i = 0; i < arr.length; i++) {
subscriber.next(arr[i]);
}
subscriber.complete();
});
Это означает, что take(1)
будет достигнуто первым, и когда это произойдет, он выдаст это значение, а затем он отправит уведомление complete
(в конечном итоге вызывая Subscriber._complete()
):
protected _complete(): void {
this.destination.complete();
this.unsubscribe();
}
Итак, помимо отправки уведомление complete
дальше по цепочке, оно тоже отпишется. В конечном итоге он достигнет RefCounterSubscriber
logi c отписки, но не будет работать , как ожидалось , потому что все происходит синхронно . В нормальных обстоятельствах, если ReplaySubject
останется без подписчиков, источник будет отписан.
Но так как он остается без подписчиков при подписке на источник , поведение будет немного другое. Список подписчиков ReplaySubject
будет пуст, но источник не будет отписан , потому что, как упоминалось выше, он все еще находится в процессе подписки .
В конце концов это означает, что будет вызван subscriber.complete();
, который, в свою очередь, вызовет получение ReplaySubject
уведомления complete
. Но помните, что тот же ReplaySubject
будет использоваться, когда источник будет повторно подписан .
В следующий раз, когда он снова подпишется на источник, эти строки будет достигнуто:
const refCounter = new RefCountSubscriber(subscriber, connectable);
// Subscribing to a **completed** Subject
// If the Subject is completed, an EMPTY subscription will be reached
const subscription = source.subscribe(refCounter);
if (!refCounter.closed) { // Since `closed === true`, this block won't be reached
(<any> refCounter).connection = connectable.connect();
}
// Returning the EMPTY subscription
return subscription;
EMPTY
реализация .
Это будет поток программы без delay(0)
setTimeout(
// Source emits and the value is cached by the subject for 1 second
// `take(1)` is reached
// Send the value, then a `complete` notif.
// But since sending a `complete` notif involves unsubscribing as well
// The current subscriber will be removed from the `ReplaySubject`'s subscribers list
// Then, the `ReplaySubject` will receive the `complete` notification and the subject becomes **completed**
() => mockHttpCache().subscribe(val => console.log("Response 50:", val), null, () => console.warn('complete[1]')
), 50);
setTimeout(
// Subscribing to a **completed** subject, but because it's a `ReplaySubject`
// We'd still be getting the cached values, along with a `complete` notification
() => mockHttpCache().subscribe(val => console.log("Response 500:", val), null, () => console.warn('complete[2]')
), 500);
setTimeout(
// Since `1`'s time expired at 1 second, the `ReplaySubject` will only send a complete notification
() => mockHttpCache().subscribe(val => console.log("Response 1500:", val), null, () => console.warn('complete[3]')
), 1500);
Это то, что будет зарегистрировано:
RESUBSCRIBING
Response 50:
1
complete[1]
Response 500:
1
complete[2]
complete[3]
С delay(0)
Это зависит от некоторых деталей, упомянутых в предыдущем разделе.
delay(0)
будет планировать действие в AsyncScheduler
(по умолчанию) для каждого уведомления nexted
. Задача действия - выдать полученное значение после передачи 0 ms
. По сути, это то же самое, что и использование setTimeout
, что означает, что не будет чем-то синхронным .
Однако при использовании of()
уведомление complete
будет отправлено синхронно. Вот как с этим справляется delay
:
protected _complete() {
// `this.queue` is populated when a `nexted` value arrives
if (this.queue.length === 0) {
this.destination.complete();
}
// Unsubscribe from the previous items from the chain
// What's further will **not** be affected
this.unsubscribe();
}
Уведомление complete
будет отправлено, когда очередь пуста. Но имейте в виду, что это все асинхронно , что означает, что RefCountSubscriber
будет вести себя нормально .
Это будет поток программы с delay(0)
:
setTimeout(
// Subscribing to the source, which emits a value and a complete notif, synchronously
// `delay` schedules an action that will do its job in 0ms(still asynchronously)
// The value is emitted by the `delay`'s scheduled action
// `take(1)` is reached
// The value will be passed along then a `complete` notif will be sent
// Then, the source will be unsubscribed
// Due to `refCount`, the complete notif that came from the source
// Won't reach the `ReplaySubject`. as it will already be unsubscribed from the source
() => mockHttpCache().subscribe(val => console.log("Response 50:", val), null, () => console.warn('complete[1]')
), 50);
setTimeout(
// Since only `500ms` have passed, this subscriber will receive the cached value (`1`)
// and a `complete` notification, due to `take(1)`
// But since `take(1)` operates synchronously, the `RefCountSubscriber` would be closed already, so the source won't be re-subscribed (//1)
() => mockHttpCache().subscribe(val => console.log("Response 500:", val), null, () => console.warn('complete[2]')
), 500);
setTimeout(
// `1500ms` passed, since `1000ms` the cache is empty
// So the `take(1)` operator will receive nothing, meaning that the source
// will be re-subscribed
() => mockHttpCache().subscribe(val => console.log("Response 1500:", val), null, () => console.warn('complete[3]')
), 1500);
Вывод:
RESUBSCRIBING
Response 50:
1
complete[1]
Response 500:
1
complete[2]
RESUBSCRIBING
Response 1500:
2
complete[3]
//1
, чтобы увидеть, что RefCountSubscriber
равно закрыто , вы можете открыть инструменты разработчика в ваш проект SB, нажмите CTRL + P
, введите refCount.ts
и поместите точку журнала в строку 78 (например: 'refCounter.closed', refCounter.closed
):
if (!refCounter.closed) { /* ... */ }
и, если вы закомментируете последний setTimeout(() => {}, 1500)
, вы должны увидеть что-то вроде этого:
refCounter.closed
false
RESUBSCRIBING
Response 50:
1
complete[1]
Response 500:
1
complete[2]
refCounter.closed // Closed due to `take(1)`
true