В RxJS Observer вводится в выполнение Observable? - PullRequest
0 голосов
/ 30 января 2019

Я прочитал документацию ReactiveX несколько раз, и все еще не могу точно понять, что происходит, когда Наблюдатель подписывается на Наблюдаемый .

Давайте рассмотрим простой пример:

import { Observable } from 'rxjs'; 

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.complete();
});

const observer = {
  next: (x) => console.log('got value ' + x),
  error: (err) => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done')
};

observable.subscribe(observer);

Код StackBlitz .


Мой вопрос:

Откуда берется объект subscriber, который передается в Observable ?

Из документации RxJS :

Не случайно observable.subscribe и subscribe в new Observable(function subscribe(subscriber) {...}) имеют одно и то же имя.В библиотеке они разные, но для практических целей вы можете считать их концептуально равными.

Таким образом, по-видимому, объект, переданный в обратный вызов subscribe в ObservableКонструктор (subscriber) - это , а не на самом деле observer объект.По крайней мере, если вы воспользуетесь приведенной выше цитатой о том, как на самом деле работает библиотека.

Если передается не объект observer, то что именно вызывает subscriber.next(1) и subscribe.complete()?Как это связано со свойством next в observer?


Уточняющий правка:

Я знаю, как использовать RxJS и действительно, что можно концептуально представить, что Наблюдатель вводится (как говорится в цитате).Тем не менее, я здесь, чтобы понять, как это на самом деле работает.

Ответы [ 2 ]

0 голосов
/ 07 февраля 2019

Процесс создания Observable проходит следующим образом:

* Observable определяется автором (здесь вручную с new, для целей объяснения):

const myObservable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.complete();
  return function tearDownLogic() {
    console.log('runs when Observable for whatever reason is done (complete, error, or unsubscribed)')
  }
});

Обратный вызов subscribe, переданный выше Observable, сохраняется локально с помощью Observable конструктора :

constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
  if (subscribe) {
    this._subscribe = subscribe;
  }
}

Итак, у нас есть вся функция subscribe, определенная какus или любой другой предварительно сделанный Observable, сохраненный для последующего выполнения.

Наблюдатель может быть передан обратному вызову subscribe в одной из нескольких форм.Либо как одна-три функции напрямую ( next , error , complete ), либо как объект с одним или несколькими из тех же трех методов.Для целей этого объяснения мы реализуем последний и более подробный параметр:

const observer = {
  next(v) {
    console.log(v);
  }
  error(err) {
    console.log(err);
  }
  complete() {
    console.log('Observable has now completed and can no longer emit values to observer');
  }
}

Теперь начинается интересная часть.Мы передаем observer в метод Observable.subscribe(..):

myObserver.subscribe(observer);

Метод подписки выглядит следующим образом:

  subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
            error?: (error: any) => void,
            complete?: () => void): Subscription {


    const { operator } = this;
    const sink = toSubscriber(observerOrNext, error, complete);


    if (operator) {
      sink.add(operator.call(sink, this.source));
    } else {
      sink.add(
        this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
        this._subscribe(sink) :
        this._trySubscribe(sink)
      );
    }


    if (config.useDeprecatedSynchronousErrorHandling) {
      if (sink.syncErrorThrowable) {
        sink.syncErrorThrowable = false;
        if (sink.syncErrorThrown) {
          throw sink.syncErrorValue;
        }
      }
    }


    return sink;
  }

Вкратце, subscribe method:

  1. Получает observer в одной из ранее обсужденных форм
  2. toSubscriber преобразует наблюдателя в объект Subscriber независимо от того, передан ли он в форме (экземпляр Subscriber сохраняется в переменной sink)
  3. Примечание. Переменная operator равна undefined, если вы не подписаны на оператора.Таким образом, просто игнорируйте операторы if вокруг operator
  4. Subscriber расширяет (связан с прототипом) объект Subscription, который имеет два важных метода в своем прототипе: unsubscribe(), add()
  5. add(..) используется для добавления " логики прерывания " (функции) к Observable, который будет выполняться, когда Observable завершит или отписался .Он возьмет любую переданную ему функцию, обернет ее в объект Subscription и поместит функцию в переменную Subscription _unsubscribe.Этот Subscription сохраняется в Subscriber, который мы создали выше, в переменной с именем _subscriptions.Как уже отмечалось, мы делаем все для того, чтобы Subscriber было неподписано или завершает , вся логика add() 'ed разрыва выполняет
  6. В качестве примечания Observable.subscribe() возвращает экземпляр Subscriber.Таким образом, вы можете вызвать mySubscriber.add( // some tear down logic) на нем в любой момент, чтобы добавить функции, которые будут выполняться, когда Observable завершит или будет отписаться
  7. Важная часть теперьenfolds: this._trySubscribe(sink) выполняется (внутри add(), как параметр)._trySubscribe(..) - это функция, которая фактически выполняет обратный вызов subscribe, ранее сохраненный конструктором Observable.Важно отметить, что он передается в sink (наш новый экземпляр Subscriber) в качестве обратного вызова для обратного вызова Observable.Другими словами, когда subscriber.next(1) внутри Observable выполняется, мы фактически выполняем next(1) в экземпляре sink (Subscriber) (next() находится на прототипе Subscriber).

Итак, на данный момент это подводит меня к концу.Внутри toSubscribe и вокруг процесса unsubscribe , помимо прочего, есть больше деталей, но они выходят за рамки этого Q & A.

Вкратце, чтобы ответить на вопрос вназвание, Наблюдатель действительно передается в Observable, просто после преобразования в объединяющий объект Subscriber.

Надеюсь, это поможет кому-то еще в будущем.

0 голосов
/ 30 января 2019

Нет, наблюдатель не вводится в наблюдаемое.

AFAICT, Путаница связана с тем, что синтаксис new Observable(...) является скорее фабрикой низкого уровня, чем полезным шаблоном.

Это, в большей или меньшей степени, механизм, который используется в более простых реализациях, таких как of(value1, value2, ..., valueN), from(enumeration) и fromEvent(...).

Эти методы являются фактическим вариантом использования, на котором вам следует сосредоточиться.

Под прикрытием все эти методы соединяют некое подобие синхронного или асинхронного значения или взаимодействия в чудесный мир наблюдаемых потоков.Для этого они, так или иначе, действуют как как надлежащий Наблюдатель: они генерируют предметы и помещают их в поток.Для этого они используют функцию, которая называется next.Как и метод в реализациях Observer, bacause на самом деле вызывается точно так же.

В частности, вы можете посмотреть на реализацию метода подписки здесь:

https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts

Если вы хотите узнать, что на самом деле происходит во время подписки, я предлагаю вам взглянуть на код.Но, IMO, вы должны попробовать только после ознакомления с различными функциями создания Observable.

Надеюсь, это поможет.

...