вариация CombinLatest, которая завершается, когда завершается первая из наблюдаемых, переданных в качестве параметров - PullRequest
0 голосов
/ 25 апреля 2018
Оператор

combineLatest возвращает Observable, который завершается, когда все наблюдаемые, переданные в качестве параметров, combineLatest complete.

Есть ли способ создать Observable, который ведет себя так же, как тот, который возвращается combineLatest, с той лишь разницей, что он завершается, когда first Observables передан как параметры завершены?

Ответы [ 2 ]

0 голосов
/ 25 апреля 2018

const a = Rx.Observable.interval(400).share();
const b = Rx.Observable.interval(600).share();
const c = Rx.Observable.interval(1000).take(3).share();

const combined = Rx.Observable.combineLatest(a, b, c)
  .takeUntil(
    Rx.Observable.merge(
      a.ignoreElements().concat(Rx.Observable.of('fin-a')).do(console.log),
      b.ignoreElements().concat(Rx.Observable.of('fin-b')).do(console.log),
      c.ignoreElements().concat(Rx.Observable.of('fin-c')).do(console.log)
    )
  );

combined
  .subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.js"></script>

Вам нужно .share() входные наблюдаемые, потому что они нужны вам дважды, один раз для .combineLatest() и один раз для .takeUntil для завершения вашей наблюдаемойstream.

Я использовал .ignoreElements() в .takeUntil, чтобы игнорировать любые значения, и только когда исходный поток (либо a, b, либо c) завершил .concat a 'окончательное 'сообщение для него, чтобы дать сигнал .takeUntil для завершения нашей подписки на .combineLatest.Это также работает, если либо a,b,c не выдает никаких значений.

0 голосов
/ 25 апреля 2018

Да, вы можете; Вы можете сделать что-то вроде этого:

function combineLatestUntilFirstComplete(...args) {
  const shared = args.map(a => a.share());
  return Rx.Observable
    .combineLatest(...shared)
    .takeUntil(Rx.Observable.merge(...shared.map(s => s.last())));
}

const a = Rx.Observable.interval(100).map(index => `a${index}`).take(5);
const b = Rx.Observable.interval(200).map(index => `b${index}`).take(5);

combineLatestUntilFirstComplete(a, b).subscribe(
  value => console.log(JSON.stringify(value)),
  error => console.error(error),
  () => console.log("complete")
);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

Реализация принимает значения из наблюдаемой, возвращаемой из внутреннего вызова, в combineLatest, пока одна из исходных наблюдаемых не выдаст свое последнее значение.

Обратите внимание, что наблюдаемые источника являются общими, поэтому подписки из-за вызова takeUntil не влияют на вторичные подписки на наблюдаемые холодные источники.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...