Rxjs, наблюдаемый с Внутренней подпиской, занимающей много времени, и внешняя наблюдаемая, выполняемая сначала в последующих запросах - PullRequest
0 голосов
/ 30 октября 2019

У меня есть кнопка, которая запускает Observable, затем я подписываюсь на результат, и у меня есть подписка, которая зависит от предыдущего результата Observable.

this.getData(params).subscribe(result => {
    // Make some first level process
    console.log('[Outter Execution]: ',result);

    this.getInnerData(result).subscribe(res => {
        // Make some inner level process
        console.log('[Inner Execution]: ',res);
    });
});

Зависит от скорости нажатия пользователя,последовательность не та же:

//User click slowly

[Outer Execution]
[Inner Execution]

[Outer Execution]
[Inner Execution]

//User start clicking quickly

[Outer Execution]
[Inner Execution]

[Outer Execution]
[Inner Execution]

[Outer Execution]
[Outer Execution]
[Outer Execution]

[Inner Execution]
[Inner Execution]
[Inner Execution]

[Outer Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]

Как видите, если вложенная подписка занимает много времени и пользователь нажимает еще раз, прежде чем внутренняя подписка будет разрешена, первое сообщение [Outer Execution] выходит из системы. до внутреннего исполнения было решено. Некоторое время спустя, предыдущие длительные внутренние подписки решаются и возвращаются зарегистрированные сообщения.

Я попытался безуспешно использовать операторы switchMap и mergeMap .

[Отредактировано]: необходимая мне функциональность - это выполнение в виде блока, и следующее внешнее выполнение должно выполняться после первого выполнения (внешняя и внутренняя подписка).

Ответы [ 2 ]

1 голос
/ 30 октября 2019

Чтобы гарантировать, что обработка ожидает , вам нужно использовать concatMap для главной наблюдаемой, а не для зависимой наблюдаемой. В противном случае он не ждет набора.

Я разветвился перед вашими последними изменениями, но вот то, что я придумал, похоже, работает:

  private clickSubject = new Subject<number>();
  clickAction$ = this.clickSubject.asObservable();

  ngOnInit() {
    this.clickAction$
      .pipe(
        concatMap(value => this.mainProcess(value)
          .pipe(
            mergeMap(x => this.dependantProcess(x))
          )
        )
      )
      .subscribe();
  }

  onClick(value) {
    // Emits a value into the action stream
    this.clickSubject.next(value);
  }

  mainProcess(value) {
    console.log("[Emitted] Main", value);
    return of(value).pipe(delay(10));
  }

  dependantProcess(value) {
    console.log("[Emitted] Dependent", value);
    return of(value).pipe(delay(2000));
  }

Обратите внимание, что concatMapиспользуется для ожидания mainProcess.

  • Этот код реагирует на щелчки пользователей, определяя поток действий с помощью RxJS Subject.

  • Каждый раз, когда пользователь нажимает кнопку, генерируется поток действий.

  • Конвейер потока действий использует concatMap для кэширования запроса и ожидания его обработки до обработки предыдущего запроса.

  • Когда запускается основной процесс, выполняется зависимый процесс.

  • Когда конвейер завершен (основной и зависимый процесс), затем следующийкэшированный запрос обрабатывается concatMap.

Имеет смысл?

Вы можете найти обновленный код здесь:

https://stackblitz.com/edit/angular-dependent-order-deborahk

0 голосов
/ 30 октября 2019

Существует много способов работы с RxJS, и без конкретного примера здесь сложно дать вам конкретный ответ. Лучшее, что я могу сделать, это предоставить вам пример из моего кода и надеяться, что он близок к тому, что вы пытаетесь достичь.

У меня есть товары с их поставщиками. Когда я получаю продукт, у него есть массив идентификаторов поставщиков. Поэтому мне нужно получить продукт, а затем найти поставщиков в рамках этой операции. (Звучит похоже на ваш сценарий?)

Вот мой код:

  selectedProductSuppliers$ = this.selectedProduct$
    .pipe(
      filter(selectedProduct => Boolean(selectedProduct)),
      switchMap(selectedProduct =>
        forkJoin(selectedProduct.supplierIds.map(supplierId => this.http.get<Supplier>(`${this.suppliersUrl}/${supplierId}`)))
      ),
      tap(suppliers => console.log('product suppliers', JSON.stringify(suppliers)))
    );

Этот код использует switchMap, чтобы гарантировать, что если пользователь нажимает несколько раз, он переключится на последнююselection.

Затем он использует forkJoin для обработки каждого идентификатора поставщика, отправляя запрос http get для получения данных каждого поставщика и объединения их в один передаваемый массив.

Возвращаемое значениеObservable<Supplier[]>

Вы можете найти полный пример здесь: https://github.com/DeborahK/Angular-RxJS

ОБНОВЛЕНИЕ:

Если вы измените код выше, чтобы использовать concatMap вместо switchMap он будет обрабатывать каждый запрос, ожидая, пока он не завершится, перед выполнением следующего.

enter image description here

enter image description here

...