Вот один из подходов:
const be$ = concat(
of(1).pipe(delay(100)),
of(2).pipe(delay(100)),
of(3).pipe(delay(100)),
of(4).pipe(delay(100)),
of(5).pipe(delay(100)),
of(6).pipe(delay(100)),
of(10).pipe(delay(500)), // After freeze
of(11).pipe(delay(100)),
of(12).pipe(delay(100)),
).pipe(shareReplay({ bufferSize: 1, refCount: true, }), endWith(null));
// `skip(1)` - the `ReplaySubject` used be `shareReplay()` will give us the latest value
// and it's not needed
const beReady$ = be$.pipe(skip(1), take(1));
const fe$ = be$.pipe(
mergeMap(v => merge(
of(v),
of(v).pipe(
expand(v => timer(100).pipe(map(v1 => 1 + v))),
takeUntil(beReady$),
)
)),
distinctUntilChanged(),
filter(v => v !== null)
).subscribe(console.log)
endWith(null)
- чтобы остановить рекурсию, когда испускается последнее значение (12
), нам нужен источник, чтобы испустить что-то еще
shareReplay
- необходимо предоставить доступ к источнику, поскольку будет другой подписчик (beReady$
), кроме основного подписчика (fe$
)
mergeMap(v => merge(
of(v), // Pass along the current value
of(v).pipe(
// If the `be$` does not emit in the next 100ms, send the `currentValue + 1`
// and keep doing the same until the `be$` finally emits
expand(v => timer(100).pipe(map(v1 => 1 + v))),
takeUntil(beReady$),
)
)),
expand
похоже на использование mergeMap
, но:
- он будет проходить по внутреннему значению
- он создаст другую внутреннюю наблюдаемую на основе последнего внутреннего значения ; Итак, это рекурсивно
takeUntil(beReady$)
, как рекурсия может быть остановлена
StackBlitz