Оператор обмена RxJs и Observable созданы с диапазоном - PullRequest
0 голосов
/ 09 февраля 2019

Я пытаюсь понять, почему оператор share RxJs работает по-разному, если источник Observable создается с range вместо timer .

Изменение исходного кода на:

const source = range(1, 1)
    .pipe(
        share()
    )

const example = source.pipe(
    tap(() => console.log('***SIDE EFFECT***')),
    mapTo('***RESULT***'),
)

const sharedExample = example
const subscribeThree = sharedExample.subscribe(val => console.log(val))
const subscribeFour = sharedExample.subscribe(val => console.log(val))

Результат:

console.log src / pipeline / foo.spec.ts: 223 ПОБОЧНЫЙ ЭФФЕКТ

console.log src / pipeline / foo.spec.ts: 228 РЕЗУЛЬТАТ

console.log src / pipe/foo.spec.ts:223 ПОБОЧНЫЙ ЭФФЕКТ

console.log src / pipeline / foo.spec.ts: 229 РЕЗУЛЬТАТ

По сути, побочный эффект вызывается более одного раза.

Насколько я знаю, range предполагается наблюдаемым при холоде, но говорятчто share должно превратить холодные наблюдаемые в горячие.

Чем объясняется такое поведение?

1 Ответ

0 голосов
/ 10 февраля 2019

Следует отметить две вещи.

Во-первых, если вы внимательно посмотрите на сигнатуру функции для range, вы увидите, что она принимает третий параметр, SchedulerLike.

* 1006.* Если не указано, RxJS вызывает обработчик next каждого подписчика немедленно с соответствующим значением для наблюдаемой range до тех пор, пока она не будет исчерпана.Это нежелательно, если вы намереваетесь использовать оператор share, поскольку он эффективно обходит любую общую обработку побочных эффектов, которая может быть введена.

Соответствующий фрагмент, взятый из фактической реализации:

// src/internal/observable/range.ts#L53
do {
  if (index++ >= count) {
    subscriber.complete();
    break;
  }
  subscriber.next(current++);
  if (subscriber.closed) {
    break;
  }
} while (true);

timer также принимает необязательный аргумент SchedulerLike.Если не указано, реализация принимает AsyncScheduler по умолчанию, что отличается от значения по умолчанию для range.

Во-вторых, оператор share должен следовать за всеми другими операторами, которые могут иметь побочные эффекты,Если он предшествует им, то ожидаемое объединяющее поведение обработки оператором канала будет потеряно.

Таким образом, с учетом обеих точек зрения, чтобы оператор share работал с range, как вы ожидаете:

const { asyncScheduler, range, timer } = rxjs;
const { mapTo, tap, share } = rxjs.operators;

// Pass in an `AsyncScheduler` to prevent immediate `next` handler calls
const source = range(1, 1, asyncScheduler).pipe(
  tap(() => console.log('***SIDE EFFECT***')),
  mapTo('***RESULT***'),
  // All preceding operators will be in shared processing
  share(),
);

const sub3 = source.subscribe(console.log);
const sub4 = source.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...