Я думаю, что вы используете неправильный оператор здесь. Внутренняя карта также возвращает издателя.
Вам следует преобразовать внешнего издателя с помощью flatMap
, чтобы создать один поток, а не вложенный поток.
Вот пример в чистом RxJS 6:
import { of, EMPTY } from 'rxjs';
import { map, flatMap, catchError } from 'rxjs/operators';
of(3,2,1,0,1,2,3).pipe(
flatMap(v => {
return of(v).pipe(
map(x => {
if(x===0) throw Error();
return 6 / x;
}),
catchError(error => {
console.log("Shit happens")
return EMPTY
}
)
)
}
))
.subscribe(val => console.log("Request " + val + " logged "));
Каждый запрос (здесь номера) представляет собой плоскую карту в вещь, которая сохраняется.
Плоская карта означает, что персистент снова возвращает наблюдаемое. См. https://rxjs -dev.firebaseapp.com / api / operator / flatMap
Обработка ошибок этих внутренних наблюдаемых выполняется оператором catchError
. Он регистрирует ошибку, а затем возвращает пустую наблюдаемую, чтобы указать, что внутренняя наблюдаемая является «мертвой». Вы можете вернуть еще одну наблюдаемую здесь, и внутренняя часть продолжится.
Внешняя наблюдаемая, то есть ваши входящие запросы, продолжаются по пути.
У меня есть приложение для работы со стеком:
https://rxjs -qkqkm2.stackblitz.io
Удачи с NestJS и всеми различными версиями RxJS. Это выше версия 6.
Edit:
Метод
RxJS tap
является хорошим подходом для обработки побочных эффектов. Реализуйте метод перехвата следующим образом:
public intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
if (!this.reflector.get<boolean>(RequestMetaData.IS_PUBLIC_ROUTE, context.getHandler())) {
const host = context.switchToHttp();
const req = host.getRequest();
const resp = host.getResponse();
return next.handle().pipe(
tap({
next: (val) => {
this.persistRequest(val, req, resp);
},
error: (error) => {
this.persistRequest(AppError.from(error), req, resp);
}
})
);
}
return next.handle();
}