const a = Rx.Observable.interval(400).share();
const b = Rx.Observable.interval(600).share();
const c = Rx.Observable.interval(1000).take(3).share();
const combined = Rx.Observable.combineLatest(a, b, c)
.takeUntil(
Rx.Observable.merge(
a.ignoreElements().concat(Rx.Observable.of('fin-a')).do(console.log),
b.ignoreElements().concat(Rx.Observable.of('fin-b')).do(console.log),
c.ignoreElements().concat(Rx.Observable.of('fin-c')).do(console.log)
)
);
combined
.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.js"></script>
Вам нужно .share()
входные наблюдаемые, потому что они нужны вам дважды, один раз для .combineLatest()
и один раз для .takeUntil
для завершения вашей наблюдаемойstream.
Я использовал .ignoreElements()
в .takeUntil
, чтобы игнорировать любые значения, и только когда исходный поток (либо a
, b
, либо c
) завершил .concat
a 'окончательное 'сообщение для него, чтобы дать сигнал .takeUntil
для завершения нашей подписки на .combineLatest
.Это также работает, если либо a,b,c
не выдает никаких значений.