Пакетирование подписчиков на асинхронные запросы в полете - RxJS - PullRequest
1 голос
/ 25 октября 2019

У меня есть некоторые функции, которые вызывают другие асинхронные функции и возвращают результаты в виде наблюдаемых. Эти функции могут быть подписаны много раз в разных частях моего приложения одновременно.

Я бы хотел предотвратить повторный запуск асинхронной функции, если она все еще "в полете", однако все равно выдает значениевсе подписчики, как только он завершает. Если не в полете, он должен снова вызвать асинхронную функцию.

Есть ли лучший шаблон для этого;я подхожу к этому неправильно?

Я создал тему для сохранения результата и флажок для отслеживания запроса в полете.

  inFlight = false;
  subject$ = new Subject<any>();
  requestsLog = [];

  getThing() {
    console.log("(getThing) running?", this.inFlight);
    return iif(() => this.inFlight, this.subject$, this.fakeAsyncRequest$())
      .pipe(
        take(1),
        tap(date => console.log("(getThing) get value", date)),
        tap(date => this.requestsLog.push(date))
      );
  }

  fakeAsyncRequest$ = () => {
    return of(new Date().toUTCString()).pipe(
      tap(_ => {
        console.log("(fakeAsyncRequest) request");
        this.requestsLog = []; // Reset things
        this.inFlight = true; // Set in flight flag
      }),
      delay(1500), // Simulate async delay
      tap(date => this.subject$.next(date)),
      finalize(() => {
        console.log("(fakeAsyncRequest) Done");
        this.inFlight = false;
      })
    );
  };

  smilutateMultiple() {
    // Simulate a few calls to this function
    this.getThing().subscribe();
    this.getThing().subscribe();
    this.getThing().subscribe();
    setTimeout(() => {
      this.getThing().subscribe();
    }, 500);
  }

Я также пытался использовать BehaviourSubject в комбинациис ExhaustMap, но внутренняя наблюдаемая по-прежнему вызывается для каждой подписки на наблюдаемое.

  private subject$ = new BehaviorSubject<any>(false);
  public subjectObs$ = this.subject$
    .asObservable()
    .pipe(exhaustMap(() => this.fakeAsyncRequest()));

  fakeAsyncRequest = () => {
    console.log("call fake request", new Date().toUTCString());
    return this.http
    .get("https://www.reddit.com/hot.json")
    .pipe(delay(1000));
  };

  smilutateMultiple() {
    // Simulate a few subscriptions to this observable
    this.subjectObs$.subscribe(thing => console.log("Thing", thing));
    this.subjectObs$.subscribe(thing => console.log("Thing", thing));
    // Should be same request.
    setTimeout(() => {
      this.subjectObs$.subscribe(thing => console.log("Thing", thing));
    }, 500);
    // Should be new request.
    setTimeout(() => {
      this.subjectObs$.subscribe(thing => console.log("Thing", thing));
    }, 3000);
  }

Ответы [ 3 ]

1 голос
/ 25 октября 2019

Я искал оператора share . Это «делит» источник, наблюдаемый с несколькими подписчиками.

Это относится к понятиям "горячие против холодных" наблюдаемых и "многоадресная передача".

Демонстрация: https://stackblitz.com/edit/angular-akfcy5


  public observable$ = this.fakeAsyncRequest$().pipe(share());

  private fakeAsyncRequest$() {
    return this.http.get("https://www.reddit.com/hot.json").pipe(
      tap(() => console.log("Call fake request at:", new Date().toUTCString())),
      delay(500)
    );
  }

  smilutateMultiple() {
    // Simulate a few subscriptions to this observable
    this.observable$.subscribe(thing => console.log("Thing 1", thing));
    this.observable$.subscribe(thing => console.log("Thing 2", thing));
    // Should be same request.
    setTimeout(() => {
      this.observable$.subscribe(thing => console.log("Thing 3", thing));
    }, 500);
    // Should be new request.
    setTimeout(() => {
      this.observable$.subscribe(thing => console.log("Thing 4", thing));
    }, 3000);
  }

1 голос
/ 25 октября 2019

Реактивный модуль JS - RxJS, безусловно, лучший способ избежать этого. В RxJS вы можете иметь состояние, которое выглядит следующим образом:

export interface ProjectState extends EntityState<Project> {
    selectedProjectId: string;
    creating: boolean;
    created: boolean;
    loading: boolean;
    loaded: boolean;
    error: string;
}

. В таком состоянии, когда вы запускаете действие - LOAD_PROJECTS, редуктор помечает флаг «загрузка» как истинный. Теперь в других частях вашего приложения вы можете подписаться на «проекты» и флаг «загрузки». Если флаг загрузки имеет значение false, отправьте LOAD_PROJECTS, иначе не отправляйте.

Подписка на объекты «проекты»будет гарантировать, что подписчик внутри каждого компонента вызывается при обновлении ваших данных. Таким образом, предотвращаются любые дополнительные вызовы.

RxJS может показаться сложным и сложным в начале. Однако это одна из лучших структурированных библиотек для совместного использования. состояние через приложение.

0 голосов
/ 25 октября 2019

Вы можете создать фасад, использовать его для извлечения данных из разных частей вашего приложения и использовать оператор exhaustMap.

Дополнительная информация для оператора exhaustMap: https://rxjs-dev.firebaseapp.com/api/operators/exhaustMap

...