Как мне использовать Rx JS & Observables для выполнения HTTP-запроса только после запуска нескольких других событий? - PullRequest
1 голос
/ 03 августа 2020

У меня возникли проблемы с объединением переданных значений двух разных Observable s для использования в HTTP-запросе, а затем с возвратом Observable запроса для использования клиентами.

Некоторая предыстория: я работаю на расширении Twitch. Частью их экосистемы является то, что расширения получают информацию о среде через обратные вызовы событий. Те, которые меня интересуют, расположены по номерам window.Twitch.ext.onAuthorized() и window.Twitch.ext.configuration.onChanged() (если интересно, см. Подробности здесь: https://dev.twitch.tv/docs/extensions/reference#helper -extensions ).

При звонках на мой backend, мне нужна информация из обоих вышеуказанных событий. Они не будут часто меняться (если вообще когда-либо), но я не могу выполнять вызовы, пока они оба не станут доступны, и я хочу получать последнее предоставленное значение при выполнении вызовов. Похоже, что BehaviorSubject s было бы идеально для этого:

export class TwitchService {
  private authSubject: BehaviorSubject<TwitchAuth> = new BehaviorSubject<TwitchAuth>(null);
  private configSubject: BehaviorSubject<TwitchConfig> = new BehaviorSubject<TwitchConfig>(null);
  private helper: any;

  constructor(private window: Window) {
    this.helper = this.window['Twitch'].ext;
    this.helper.onAuthorized((auth: TwitchAuth) => {
      this.authSubject.next(auth);
    });
    this.helper.configuration.onChanged(() => {
      this.configSubject.next(JSON.parse(this.helper.configuration.global.content));
    });
  }

  onAuthorized(): Observable<TwitchAuth> {
    return this.authSubject.asObservable().pipe(filter((auth) => !!auth));
  }

  onConfig(): Observable<TwitchConfig> {
    return this.configSubject.asObservable().pipe(filter((config) => !!config));
  }
}

Эта модель хорошо работает для частей приложения, которые подписаны на одну из этих двух Observable s. Моя проблема в том, что я не могу найти рабочий способ объединить их и использовать последние значения, полученные от обоих, для создания одноразового Observable для HTTP-запросов.

Вот что у меня есть:

type twitchStateToObservable<T> = (auth: TwitchAuth, config: TwitchConfig) => Observable<T>;

export class BackendService {
  constructor(private http: HttpClient, private twitch: TwitchService) {}

  private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
    let subject = new Subject<T>();
    combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
      first(), // because we only want to make the http request one time
      map(([auth, config]) => f(auth, config).subscribe((resp) => subject.next(resp)))
    );
    return subject.asObservable();
  }

  exampleHttpRequest(): Observable<Response> {
    return this.httpWithTwitchState((auth, config) =>
      // this.url() and this.headers() are private functions of this service
      this.http.get<Response>(this.url(config) + 'exampleHttpRequest', { headers: this.headers(auth, config)})
    );
  }
}

И затем клиент этой службы должен иметь возможность делать HTTP-запросы с помощью этого простого вызова, без необходимости знать что-либо о событиях или заботиться о том, когда они срабатывают:

this.backend.exampleHttpRequest().subscribe((resp) => {
  // do stuff with resp here...but this is never hit
}

На основе Насколько я понимаю, combineLatest() должен выдавать новые значения всякий раз, когда любой из входных Observable s испускает новое значение. Однако вызов f(auth, config) внутри map() никогда не запускается в моем приложении. Я тестировал его с точками останова, с console.log s и следил за вкладкой «Сеть» в инструментах отладчика браузера. Возможно, вызов first() сбивает его, но я не хочу повторять http-запрос, если события снова срабатывают, по очевидным причинам.

Заранее благодарим за любые советы или указатели!

Ответы [ 2 ]

1 голос
/ 04 августа 2020

Хорошо, оказывается, я упустил кое-что очевидное. Я не подписывался на вызов combineLatest(), поэтому он никогда не выполнялся. После добавления subscribe() в конце это работало нормально:

private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
  let subject = new Subject<T>();
  combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
      first(),
      map(([auth, config]) => f(auth, config).subscribe((resp) => subject.next(resp)))
  ).subscribe(); // adding this fixed the problem
  return subject.asObservable();
} 

После этого я провел еще небольшое исследование о том, как отключить подписки в подобных случаях, что привело меня к flatMap() оператор. Итак, теперь функция httpWithTwitchState() выглядит так:

private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
  return combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
    first(),
    map(([auth, config]) => f(auth, config)),
    flatMap((resp) => resp)
  );
}

Теперь она работает, как ожидалось, и стала чище. Спасибо тем, кто нашел время, чтобы прочитать мой вопрос и / или ответ.

1 голос
/ 03 августа 2020

Вы можете думать о concat как о линии в банкомате, следующая транзакция (подписка) не может начаться, пока не завершится предыдущая!

Пример: https://stackblitz.com/edit/typescript-3qsfgk?file=index.ts&devtoolsheight=100

Пример:

const mergeValue = concat(authSubject,configSubject) 

mergeValue.subscribe( (x) => {
  // do stuff with resp here
});
...