Короткий ответ: вы не можете и не должны.Откуда наблюдаемый источник узнает, какие ошибки могут выдавать его наблюдатели?
Длинный ответ заключается в том, что обработка ошибок в RxJS изменилась - в лучшую сторону - в версии 6.
Если вы посмотрите на реализацию next
в Subject
, вы увидите, что нет обработки ошибок:
next(value?: T) {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
if (!this.isStopped) {
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) {
copy[i].next(value);
}
}
}
Субъект просто проходит через своих наблюдателей и next
на каждом.
Однако каждый наблюдатель заключен в Subscriber
.Если вы посмотрите на источник для subscribe
, то увидите, что Subscriber
создается путем передачи наблюдателя toSubscriber
.
В частности, созданному подписчику.SafeSubscriber
.И вот где обработка ошибок:
Если вы посмотрите на next
в SafeSubscriber
, вы увидите, что __tryOrUnsub
называется:
next(value?: T): void {
if (!this.isStopped && this._next) {
const { _parentSubscriber } = this;
if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
this.__tryOrUnsub(this._next, value);
} else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) {
this.unsubscribe();
}
}
}
__tryOrUnsub
попытается вызвать наблюдателя next
, и если произойдет ошибка, он отменит подписку наблюдателя от источника.
Любая ошибка, обнаруженная __tryOrUnsub
, будет сообщена с использованием hostReportError
- который выдает ошибку асинхронно, чтобы стек вызовов не был размотан.Это сделано для того, чтобы ошибки, возникающие в одном наблюдателе, не влияли на других наблюдателей.
Если вы добавите второго наблюдателя - который не выбрасывает - в ваш пример, вы должны увидеть, что второй наблюдатель ведет себя так же, как выЯ ожидал и получил "msg3"
.
Бен Леш объяснил эти изменения - и причины, по которым они были сделаны - в недавней презентации .Вы можете проверить это.