Процесс создания Observable
проходит следующим образом:
* Observable
определяется автором (здесь вручную с new
, для целей объяснения):
const myObservable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
return function tearDownLogic() {
console.log('runs when Observable for whatever reason is done (complete, error, or unsubscribed)')
}
});
Обратный вызов subscribe
, переданный выше Observable
, сохраняется локально с помощью Observable конструктора :
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
if (subscribe) {
this._subscribe = subscribe;
}
}
Итак, у нас есть вся функция subscribe
, определенная какus или любой другой предварительно сделанный Observable
, сохраненный для последующего выполнения.
Наблюдатель может быть передан обратному вызову subscribe
в одной из нескольких форм.Либо как одна-три функции напрямую ( next , error , complete ), либо как объект с одним или несколькими из тех же трех методов.Для целей этого объяснения мы реализуем последний и более подробный параметр:
const observer = {
next(v) {
console.log(v);
}
error(err) {
console.log(err);
}
complete() {
console.log('Observable has now completed and can no longer emit values to observer');
}
}
Теперь начинается интересная часть.Мы передаем observer
в метод Observable.subscribe(..)
:
myObserver.subscribe(observer);
Метод подписки выглядит следующим образом:
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscription {
const { operator } = this;
const sink = toSubscriber(observerOrNext, error, complete);
if (operator) {
sink.add(operator.call(sink, this.source));
} else {
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
this._subscribe(sink) :
this._trySubscribe(sink)
);
}
if (config.useDeprecatedSynchronousErrorHandling) {
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
}
return sink;
}
Вкратце, subscribe
method:
- Получает
observer
в одной из ранее обсужденных форм toSubscriber
преобразует наблюдателя в объект Subscriber
независимо от того, передан ли он в форме (экземпляр Subscriber
сохраняется в переменной sink
) - Примечание. Переменная
operator
равна undefined
, если вы не подписаны на оператора.Таким образом, просто игнорируйте операторы if
вокруг operator
Subscriber
расширяет (связан с прототипом) объект Subscription
, который имеет два важных метода в своем прототипе: unsubscribe()
, add()
add(..)
используется для добавления " логики прерывания " (функции) к Observable
, который будет выполняться, когда Observable
завершит или отписался .Он возьмет любую переданную ему функцию, обернет ее в объект Subscription
и поместит функцию в переменную Subscription
_unsubscribe
.Этот Subscription
сохраняется в Subscriber
, который мы создали выше, в переменной с именем _subscriptions
.Как уже отмечалось, мы делаем все для того, чтобы Subscriber
было неподписано или завершает , вся логика add()
'ed разрыва выполняет - В качестве примечания
Observable.subscribe()
возвращает экземпляр Subscriber
.Таким образом, вы можете вызвать mySubscriber.add( // some tear down logic)
на нем в любой момент, чтобы добавить функции, которые будут выполняться, когда Observable
завершит или будет отписаться - Важная часть теперьenfolds:
this._trySubscribe(sink)
выполняется (внутри add()
, как параметр)._trySubscribe(..)
- это функция, которая фактически выполняет обратный вызов subscribe
, ранее сохраненный конструктором Observable
.Важно отметить, что он передается в sink
(наш новый экземпляр Subscriber
) в качестве обратного вызова для обратного вызова Observable
.Другими словами, когда subscriber.next(1)
внутри Observable
выполняется, мы фактически выполняем next(1)
в экземпляре sink
(Subscriber
) (next()
находится на прототипе Subscriber
).
Итак, на данный момент это подводит меня к концу.Внутри toSubscribe
и вокруг процесса unsubscribe , помимо прочего, есть больше деталей, но они выходят за рамки этого Q & A.
Вкратце, чтобы ответить на вопрос вназвание, Наблюдатель действительно передается в Observable
, просто после преобразования в объединяющий объект Subscriber
.
Надеюсь, это поможет кому-то еще в будущем.