import { fromEvent, interval, Subject, Observable, of } from "rxjs";
import { takeUntil, switchMap, catchError } from "rxjs/operators";
const obs1 = p1 => {
return new Observable(observer => {
setTimeout(() => {
console.log("obs1 doing");
observer.next(p1 + "1");
observer.complete();
}, 1000);
});
};
const obs2 = p2 => {
return new Observable(observer => {
setTimeout(() => {
console.log("obs2 doing");
observer.next(p2 + "2");
observer.complete();
}, 1000);
});
};
const obs3 = p3 => {
return new Observable(observer => {
setTimeout(() => {
console.log("obs3 doing");
observer.next(p3 + "3");
observer.complete();
}, 1000);
});
};
const obsError1 = () => console.log("obs1 is error")
const obsError2 = () => console.log("obs2 is error")
const obsError3 = () => console.log("obs3 is error")
const cancle = new Subject();
new Observable(observer => {
obs1(0).subscribe(
x1 => {
obs2(x1).subscribe(
x2 => {
obs3(x2).subscribe(
x3 => {
observer.next(x3);
observer.complete();
},
obsError3
);
},
obsError2
);
},
obsError1
);
}).pipe(takeUntil(cancle)).subscribe(()=>{
console.log()
});
setTimeout(() => {
console.log("cancle doing");
cancle.next();
cancle.complete();
}, 100);
Когда он запускается cancle.next()
, он по-прежнему печатает obs2 doing
и obs3 doing
Я знаю, что переход на серийный номер можно отменить, например:
setTimeout(() => {
console.log("cancle doing");
cancle.next();
cancle.complete();
}, 100);
of(0)
.pipe(
switchMap(x1 => obs1(x1)),
catchError(error => {
obsError1()
throw error;
})
).pipe(
switchMap(x2 => obs2(x2)),
catchError(error => {
obsError2()
throw error;
})
)
.pipe(
switchMap(x3 => obs3(x3)),
catchError(error => {
obsError3()
throw error;
})
).pipe(
takeUntil(cancle)
).subscribe(x => {
console.log(x);
});
Но когда возникает ошибка в obs1, obsError2
также будет выполняться
Я не знаю лучший способ обработки ошибок
весь код находится в https://stackblitz.com/edit/mbqhrs