Вы можете использовать оператор расширения из rxjs.Он будет зацикливаться до тех пор, пока не будет предоставлено значение empty ().Вот пример:
import { empty } from 'rxjs';
private service; <--- service that gives us the observable by some value
private initialValue: number = 5;
private counter: number = 0;
private source$: Observable<number> = this.service.getSourceWithValue(initialValue);
source$.pipe(
expand(value => isCounterExceeded()
? incrementCounterAndGetNextSourceObservableWithValue(value);
: empty()
);
// if counter is not exceeded we will increment the counter and create another
// observable based on current value. If it is exceeded, we are stopping the loop by
// returning the empty() observable
private incrementCounterAndGetNextSourceObservableWithValue(value: number): Observable<number> {
this.counter++;
return this.service.getSourceWithValue(value);
}
private isCounterExceeded() {
return this.counter >= 4;
}