Немедленное наблюдаемое не срабатывает через Observable.defer - PullRequest
3 голосов
/ 16 июня 2020

Я пытаюсь использовать кеширование Rx JS, чтобы избежать ненужного повторения определенных HTTP-вызовов. Экспериментируя с publishReplay, я получил следующий фрагмент (вдохновленный этим сообщением в блоге ):

let counter = 1;
const updateRequest = Observable.defer(() => mockDataFetch())
  .publishReplay(1, 1000)
  .refCount();

function mockDataFetch() {
  return Observable.of(counter++)
    .delay(0); // <-- delay by 0 milliseconds
}

function mockHttpCache() {
  return updateRequest
    .take(1);
}

setTimeout(() => mockHttpCache().subscribe(val => console.log("Response 50:", val)), 50);
setTimeout(() => mockHttpCache().subscribe(val => console.log("Response 500:", val)), 500);
setTimeout(() => mockHttpCache().subscribe(val => console.log("Response 1500:", val)), 1500);

Это работает, как ожидалось, и дает результат:

'Response 50:', 1
'Response 500:', 1
'Response 1500:', 2

Однако при удалении .delay(0) из внутреннего наблюдаемого, делая его немедленным, оболочка больше не дает результатов по истечении времени кэширования. Результатом будет:

'Response 50:', 1
'Response 500:', 1

Похоже, что mockDataFetch не вызывается для сбора новых данных, даже если больше нет кешированного элемента. Является ли это предполагаемым поведением, и если да, то каково его обоснование?

1 Ответ

3 голосов
/ 17 июня 2020

Вот ваш код, переведенный в Rx Js 6.5.5, вместе с некоторыми другими небольшими изменениями:

let counter = 1;
const updateRequest = defer(() => mockDataFetch())
  .pipe(
    publishReplay(1, 1000),
    refCount()
  );

function mockDataFetch() {
  console.log('RESUBSCRIBING');

  return of(counter++)
    .pipe(
      // delay(0), // <-- delay by 0 milliseconds
    );
}

function mockHttpCache() {
  return updateRequest
    .pipe(
      take(1),
    );
}

setTimeout(
  () => mockHttpCache().subscribe(val => console.log("Response 50:", val), null, () => console.warn('complete[1]')
), 50);
setTimeout(
  () => mockHttpCache().subscribe(val => console.log("Response 500:", val), null, () => console.warn('complete[2]')
), 500);
setTimeout(
  () => mockHttpCache().subscribe(val => console.log("Response 1500:", val), null, () => console.warn('complete[3]')
), 1500);

StackBlitz .


Без delay(0)

Давайте сначала посмотрим, как реализован publishReplay :

const selector = typeof selectorOrScheduler === 'function' ? selectorOrScheduler : undefined;
const subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);

return (source: Observable<T>) => multicast(() => subject, selector!)(source) as ConnectableObservable<R>;

Как мы видим, он возвращает ConnectableObservable, из-за на multicast:

const connectable: any = Object.create(source, connectableObservableDescriptor);
connectable.source = source;
connectable.subjectFactory = subjectFactory;

return <ConnectableObservable<R>> connectable;

И вот как refCount выглядит так:

// `connectable` - the `ConnectableObservable` from above
constructor(private connectable: ConnectableObservable<T>) { }

// `call` - called when the source is subscribed
// `source` - the `ConnectableObservable` from above
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
  const { connectable } = this;
  (<any> connectable)._refCount++;

  const refCounter = new RefCountSubscriber(subscriber, connectable);
  const subscription = source.subscribe(refCounter);

  if (!refCounter.closed) {
    (<any> refCounter).connection = connectable.connect();
  }

  return subscription;
}

Теперь давайте присмотритесь к ConnectableObservable, особенно к методу subscribe:

// Invoked as a result of `const subscription = source.subscribe(refCounter);` from `refCount`
_subscribe(subscriber: Subscriber<T>) {
  return this.getSubject().subscribe(subscriber);
}

protected getSubject(): Subject<T> {
  const subject = this._subject;
  if (!subject || subject.isStopped) {
    this._subject = this.subjectFactory();
  }
  return this._subject!;
}

Где subjectFactory возвращает экземпляр ReplaySubject. Что в основном происходит на const subscription = source.subscribe(refCounter);, так это то, что RefCounterSubscriber будет добавлен в список активных подписчиков ReplaySubject. A RefCounterSubscriber отслеживает количество подписчиков, и когда их больше нет, он автоматически подписывается на источник при регистрации нового подписчика (при использовании того же экземпляра ReplaySubject).

Далее , Будет вызвано (<any> refCounter).connection = connectable.connect();.

connectable.connect() выполняет следующее:

  connect(): Subscription {
  let connection = this._connection;
  if (!connection) {
    this._isComplete = false;
    connection = this._connection = new Subscription();
    connection.add(this.source
      .subscribe(new ConnectableSubscriber(this.getSubject(), this)));
    if (connection.closed) {
      this._connection = null;
      connection = Subscription.EMPTY;
    }
  }
  return connection;
}

При достижении этих строк:

connection.add(this.source
    .subscribe(new ConnectableSubscriber(this.getSubject(), this)));

источник (например, mockDataFetch()) будет фактически подписан.

Теперь of(counter) реализован следующим образом (примерно):

// In this case, `arr = [counter]`
new Observable(subscriber => {
  for (let i = 0; i < arr.length; i++) {
    subscriber.next(arr[i]);
  }

  subscriber.complete();
});

Это означает, что take(1) будет достигнуто первым, и когда это произойдет, он выдаст это значение, а затем он отправит уведомление complete (в конечном итоге вызывая Subscriber._complete()):

protected _complete(): void {
  this.destination.complete();
  this.unsubscribe();
}

Итак, помимо отправки уведомление complete дальше по цепочке, оно тоже отпишется. В конечном итоге он достигнет RefCounterSubscriber logi c отписки, но не будет работать , как ожидалось , потому что все происходит синхронно . В нормальных обстоятельствах, если ReplaySubject останется без подписчиков, источник будет отписан.

Но так как он остается без подписчиков при подписке на источник , поведение будет немного другое. Список подписчиков ReplaySubject будет пуст, но источник не будет отписан , потому что, как упоминалось выше, он все еще находится в процессе подписки .

В конце концов это означает, что будет вызван subscriber.complete();, который, в свою очередь, вызовет получение ReplaySubject уведомления complete. Но помните, что тот же ReplaySubject будет использоваться, когда источник будет повторно подписан .

В следующий раз, когда он снова подпишется на источник, эти строки будет достигнуто:

const refCounter = new RefCountSubscriber(subscriber, connectable);
// Subscribing to a **completed** Subject
// If the Subject is completed, an EMPTY subscription will be reached
const subscription = source.subscribe(refCounter);

if (!refCounter.closed) { // Since `closed === true`, this block won't be reached
  (<any> refCounter).connection = connectable.connect();
}

// Returning the EMPTY subscription
return subscription;

EMPTY реализация .

Это будет поток программы без delay(0)

setTimeout(
  // Source emits and the value is cached by the subject for 1 second
  // `take(1)` is reached
  // Send the value, then a `complete` notif.
  // But since sending a `complete` notif involves unsubscribing as well
  // The current subscriber will be removed from the `ReplaySubject`'s subscribers list
  // Then, the `ReplaySubject` will receive the `complete` notification and the subject becomes **completed**
  () => mockHttpCache().subscribe(val => console.log("Response 50:", val), null, () => console.warn('complete[1]')
), 50);
setTimeout(
  // Subscribing to a **completed** subject, but because it's a `ReplaySubject`
  // We'd still be getting the cached values, along with a `complete` notification
  () => mockHttpCache().subscribe(val => console.log("Response 500:", val), null, () => console.warn('complete[2]')
), 500);
setTimeout(
  // Since `1`'s time expired at 1 second, the `ReplaySubject` will only send a complete notification
  () => mockHttpCache().subscribe(val => console.log("Response 1500:", val), null, () => console.warn('complete[3]')
), 1500);

Это то, что будет зарегистрировано:

RESUBSCRIBING
Response 50:
1
complete[1]
Response 500:
1
complete[2]
complete[3]

С delay(0)

Это зависит от некоторых деталей, упомянутых в предыдущем разделе.

delay(0) будет планировать действие в AsyncScheduler (по умолчанию) для каждого уведомления nexted. Задача действия - выдать полученное значение после передачи 0 ms. По сути, это то же самое, что и использование setTimeout, что означает, что не будет чем-то синхронным .

Однако при использовании of() уведомление complete будет отправлено синхронно. Вот как с этим справляется delay :

protected _complete() {
  // `this.queue` is populated when a `nexted` value arrives
  if (this.queue.length === 0) {
    this.destination.complete();
  }

  // Unsubscribe from the previous items from the chain
  // What's further will **not** be affected
  this.unsubscribe();
}

Уведомление complete будет отправлено, когда очередь пуста. Но имейте в виду, что это все асинхронно , что означает, что RefCountSubscriber будет вести себя нормально .

Это будет поток программы с delay(0):

setTimeout(
  // Subscribing to the source, which emits a value and a complete notif, synchronously
  // `delay` schedules an action that will do its job in 0ms(still asynchronously)
  // The value is emitted by the `delay`'s scheduled action
  // `take(1)` is reached
  // The value will be passed along then a `complete` notif will be sent
  // Then, the source will be unsubscribed
  // Due to `refCount`, the complete notif that came from the source
  // Won't reach the `ReplaySubject`. as it will already be unsubscribed from the source
  () => mockHttpCache().subscribe(val => console.log("Response 50:", val), null, () => console.warn('complete[1]')
), 50);
setTimeout(
  // Since only `500ms` have passed, this subscriber will receive the cached value (`1`)
  // and a `complete` notification, due to `take(1)`
  // But since `take(1)` operates synchronously, the `RefCountSubscriber` would be closed already, so the source won't be re-subscribed (//1)
  () => mockHttpCache().subscribe(val => console.log("Response 500:", val), null, () => console.warn('complete[2]')
), 500);
setTimeout(
  // `1500ms` passed, since `1000ms` the cache is empty
  // So the `take(1)` operator will receive nothing, meaning that the source
  // will be re-subscribed
  () => mockHttpCache().subscribe(val => console.log("Response 1500:", val), null, () => console.warn('complete[3]')
), 1500);

Вывод:

RESUBSCRIBING
Response 50:
1
complete[1]
Response 500:
1
complete[2]
RESUBSCRIBING
Response 1500:
2
complete[3]

//1, чтобы увидеть, что RefCountSubscriber равно закрыто , вы можете открыть инструменты разработчика в ваш проект SB, нажмите CTRL + P, введите refCount.ts и поместите точку журнала в строку 78 (например: 'refCounter.closed', refCounter.closed):

if (!refCounter.closed) { /* ... */ }

и, если вы закомментируете последний setTimeout(() => {}, 1500), вы должны увидеть что-то вроде этого:

refCounter.closed
false
RESUBSCRIBING
Response 50:
1
complete[1]
Response 500:
1
complete[2]
refCounter.closed // Closed due to `take(1)`
true
...