Живой рабочий пример .Тогда вам нужно использовать Subjects
.Обычные наблюдаемые являются одноадресными (это означает, что каждый подписчик владеет независимым исполнением Observable
).Таким образом, каждый наблюдатель получает вызов всей цепочки казни, которую вы имеете.
observable = Rx.Observable.timer(0, 1000)
.map(val => doSomething(val));
map
вызывается для каждого наблюдателя.
Субъекты являютсяспециальный тип Observables, который позволяет значениям быть многоадресными, это означает, что вы разделяете одну строку выполнения вашего Observable.Это rxjs6, если вы заблудились с конвейерными операторами, , пожалуйста, посмотрите здесь .
Прежде всего, получите imports
,
import { Observable, Subject, timer } from 'rxjs';
import { map, share } from 'rxjs/operators';
Тогда у вас есть,
const subject = new Subject();
const doSomething = val => {
console.log("doing something");
return val;
}
const observable = timer(0, 1000).pipe(
map(val => doSomething(val)),
).pipe(share());
const first = observable.subscribe(val => console.log("first:", val));
const second = observable.subscribe(val => console.log("second:", val));
const tercer = observable.subscribe(val => console.log("tercer:", val));
// After 1.5 seconds, stop first.
timer(1500).subscribe(_ => first.unsubscribe());
// After 2.5 seconds, stop second.
timer(2500).subscribe(_ => second.unsubscribe());
// After 2.5 seconds, stop second.
timer(2500).subscribe(_ => tercer.unsubscribe());