У меня есть больше обходного пути, чем решение.
Ваше требование - запускать параллельные запросы и, в зависимости от первого ответа, отменить второй.
Параллельные запросы могут быть выполнены с использованием forkJoin
, но все наблюдаемые разрешаются вместе,
merge()
также будет запускать параллельные запросы, но любой ответ может прийти в любом порядке. С помощью merge () мы не сможем определить, какой ответ пришел от какого Observable. Если у вас есть свобода изменять возвращаемую наблюдаемую и добавлять флаг для указания на индекс наблюдаемой, то вы можете достичь этого с помощью некоторых дополнительных флагов и кода, выглядящего так:
export class AppComponent {
name = 'Angular';
obsOne = of('First Obs').pipe(map((res) => {
return {
firstObs: true,
result: res
}
}))
obsTwo = of('Second Obs').pipe(delay(6000))
secondObsReturned = false
timerHandle
obsSubcription: Subscription;
ngOnInit() {
this.obsSubcription = merge(this.obsOne, this.obsTwo).subscribe((data) => {
// you can add all this logic in pipe(map()) instead of handling in subscribe
console.log(`data returned`, data)
// some appropriate checks here
if (typeof data === 'object' && data.hasOwnProperty('firstObs')) {
if (!this.secondObsReturned) {
// can use rxjs timer here
this.timerHandle = setTimeout(() => {
console.log('Delayed more than 5 seconds');
this.obsSubcription.unsubscribe();
}, 5000)
}
}
else {
// this is the second onservable (which may have come early)
this.secondObsReturned = true;
}
})
}
}
См. Пример здесь: https://stackblitz.com/edit/angular-s6wkk2
EDIT
Итак, я думал о каком-то способе избежать изменения возвращаемого Observable, и я придумал CombineLatest
. С последним объединением в последний раз он будет ожидать значения в обеих наблюдаемых, после чего он будет излучать, даже если какой-либо из наблюдаемых разрешит.
Чтобы использовать это, снова есть ограничение. Например, вам нужно знать конкретное значение, которое Observables никогда не вернет, скажем, false
, поэтому, если вы знаете, что Observables никогда не будут возвращаться false
(или любое значение по умолчанию), тогда вы можете использовать BehaviorSubjects и combLatest. Инициализируйте объекты BehaviorSubjects значением, которое невозможно вернуть.
Вам нужно будет нажать на наблюдаемое, чтобы добавить значения к предмету.
// give appropriate types
subjectOne = <any> new BehaviorSubject(false); // will contain value of the first observable
subjectTwo = <any> new BehaviorSubject(false); // will contain value of the second observable
takeUntilSub = new Subject(); // use this to stop the subscriptions
obsOne = of('First Obs')
.pipe(
tap((value) => {
this.subjectOne.next(value);
}),
catchError((e) => {
// if an Error occurs in first then you don't want to proceeed at all
// add an error in the subjectOne, this will stop the combineLatest stream.
this.subjectOne.error('Observable one errored')
return throwError;(e)
})
)
obsTwo = of('Second Obs')
.pipe(
delay(6000),
tap((value) => {
this.subjectTwo.next(value);
}),
catchError((e) => {
// if you want to continue the stream, you need to handle the error and return a success.
// no need to populate the subject coz you don't care about this error
return of(e)
})
)
secondObsReturned = false
timerHandle;
ngOnInit() {
// calling the actual Observables here.
merge(this.obsOne, this.obsTwo).pipe(takeUntil(this.takeUntilSub)).subscribe()
// this will be called once for the very first time giving values as false for both of them (or the emitted initial values)
// after that when any one of them resolves, flow will come here
combineLatest(this.subjectOne, this.subjectTwo).pipe(takeUntil(this.takeUntilSub)).subscribe(([dataFromObsOne, dataFromObsTwo]) => {
console.log(`data received: ${dataFromObsOne} and ${dataFromObsTwo}`)
if (dataFromObsTwo !== false) {
// second observable was resolved
this.secondObsReturned = true;
if (this.timerHandle) {
clearTimeout(this.timerHandle);
}
}
if (dataFromObsOne !== false) {
// first observable resoved
if (!this.secondObsReturned) {
// if second obs hasn't already been resolved then start a timer.
this.timerHandle = setTimeout(() => {
console.log('Delayed more than 5 seconds');
this.takeUntilSub.next(true); // stop all subscriptions
}, 5000)
}
}
})
}
См. Пример здесь: Код ссылки .