Полагаю, вы хотите, чтобы source
Observable запускал какую-то задачу, а затем имел только следующее значение из source
, чтобы запускать следующую задачу, когда предыдущая задача выполнена. Вы можете достичь этого с помощью zipping
вашего source
с помощью второго триггера (startNext
), который указывает, что предыдущая задача выполнена, и может быть выдано следующее значение от source
, запускающее следующую задачу.
import { Subject, zip, of, BehaviorSubject } from 'rxjs';
import { map, tap, delay, concatMap } from 'rxjs/operators';
const source = new Subject();
const startNext = new BehaviorSubject(null);
zip(source, startNext)
.pipe(
map(([s, n]) => s), // discard the 'startNext' trigger
concatMap(s => of(s).pipe(delay(1000))),
concatMap(s => of(s).pipe(delay(200))),
concatMap(s => of(s).pipe(delay(3000))),
tap(_ => startNext.next(null))
).subscribe(s => console.log('result for', s));
source.next('a');
source.next('b');
source.next('c');
https://stackblitz.com/edit/rxjs-g5efuc