Я думаю, что это должно работать:
rxjs.interval(1000).pipe(
operators.tap(() => console.log('start')),
operators.switchMap((_, idx) => rxjs.timer(1500).pipe(
idx > 0 ? operators.startWith('completed') : rxjs.identity,
)),
operators.tap(() => console.log('end')),
operators.takeUntil(this.destroy$),
).subscribe(console.log, console.log);
В предоставленной функции switchMap
у вас есть доступ к index
.
Если idx === 0
, это означает, что нет активной внутренней наблюдаемой, поэтому от нее не следует отказываться.
В противном случае, если текущая внутренняя наблюдаемая не завершена тем временем, если приходит новое внешнее значение, это означает, что активные внутренние объекты будут отменены, это то, что указывает startWith
.
Почему endWith
не работает
operators.switchMap(() => rxjs.timer(1500).pipe(
operators.finalize(() => console.log('this works')),
operators.endWith('cancelled'),
)),
Прежде всего, важно отметить, что
src$.pipe(endWith(value))
- это то же самое, что и
concat(src$, of(value))
, что по существу совпадает с:
of(src$, of(value))
.pipe(
mergeMap(obs$ => obs$, 1) // concurrency set to 1
)
Как вы знаете, mergeMap
обрабатывает внутренние наблюдаемые. Внутренняя наблюдаемая требует внутреннего подписчика . В этом случае каждый внутренний подписчик будет добавлен в список _subscriptions
получателя. В этом случае целевой абонент является внутренним абонентом switchMap
. Иными словами, все внутренние подписчики mergeMap
могут рассматриваться как дочерние элементы внутреннего подписчика switchMap
.
Когда новое внешнее значение перехватывается switchMap
, оно отписывается от текущего внутреннего абонент. Когда это происходит, он отписывается от всех своих подписчиков-потомков, что будет делать то же самое для своих потомков и так далее. Таким образом, endWith
не будет работать, потому что его потребитель (внутренний подписчик switchMap
) отписывается.
EDIT - случай, когда внутренняя наблюдаемая завершает
ngOnInit(): void {
rxjs.interval(1000).pipe(
operators.tap(() => console.log('start')),
operators.switchMap(function (_, idx) {
return rxjs.timer(Math.random() * 1500).pipe(
operators.tap(null, null, () => console.warn("complete!")),
idx > 0 && this.innerSubscription ? operators.startWith(true) : rxjs.identity,
)}),
operators.tap(() => console.log('end')),
operators.takeUntil(this.destroy$),
).subscribe(console.log, console.log);
}
SwitchMapSubscriber.innerSubscription
равен null
, когда активных подписчиков нет (например, внутренняя наблюдаемая завершена).
Редактировать 2 - с использованием субъекта
Эта альтернатива предполагает использование Subject
, который будет излучать каждый раз Внутренние объекты, поддерживаемые switchMap
, отписываются.
const interruptSubject = new Subject();
src$ = src$.pipe(
switchMap(
(/* ... */) => timer(/* ... */)
.pipe(
/* ... */
finalize(() => interruptSubject.next(/* ...your action here... */))
)
)
);
merge(src$, interruptSubject)
.subscribe(/* ... */)