RxJS: Как запустить 2 параллельных запроса, один из которых необязательный, с таймаутом - PullRequest
3 голосов
/ 09 мая 2019

У меня есть случай, когда мне нужно запустить 2 GET-запроса одновременно:

  1. Первый из них обязательный . Мне нужно дождаться ответа, и если всплывет какая-то ошибка, у меня будет обработка исключений, и я уже могу 2. отменить свой запрос.
  2. Второй - необязательно . В случае ошибки, я могу игнорировать случай. И я хочу ждать макс. 5 секунд «больше» для этого вызова, если это займет больше времени, я хочу отменить запрос (я знаю, что не могу отменить запущенный запрос, но просто игнорирую возвращенное значение / или возвращенную ошибку). Таким образом, если 1. вызов может потребоваться 20 секунд. 2. вызов может подождать макс. 25sec. Если 1. вызову требуется всего 1сек. 2. вызов не может ждать более 6 секунд. и т.д.

Как я могу реализовать это с помощью rxjs?

Я знаю, что могу заархивировать несколько запросов, но во всех примерах, которые я до сих пор видел, был только 1 блок обработки ошибок, но здесь мне нужно различать два случая ошибок.

Заранее спасибо

Ответы [ 2 ]

1 голос
/ 09 мая 2019

У меня есть больше обходного пути, чем решение. Ваше требование - запускать параллельные запросы и, в зависимости от первого ответа, отменить второй.

Параллельные запросы могут быть выполнены с использованием forkJoin, но все наблюдаемые разрешаются вместе,

merge() также будет запускать параллельные запросы, но любой ответ может прийти в любом порядке. С помощью merge () мы не сможем определить, какой ответ пришел от какого Observable. Если у вас есть свобода изменять возвращаемую наблюдаемую и добавлять флаг для указания на индекс наблюдаемой, то вы можете достичь этого с помощью некоторых дополнительных флагов и кода, выглядящего так:

export class AppComponent  {
  name = 'Angular';
  obsOne = of('First Obs').pipe(map((res) => {
    return {
      firstObs: true,
      result: res
    }
  }))
  obsTwo = of('Second Obs').pipe(delay(6000))

  secondObsReturned = false
  timerHandle
  obsSubcription: Subscription;

  ngOnInit() {
    this.obsSubcription = merge(this.obsOne, this.obsTwo).subscribe((data) => {

      // you can add all this logic in pipe(map()) instead of handling in subscribe

      console.log(`data returned`, data)
      // some appropriate checks here
      if (typeof data === 'object' && data.hasOwnProperty('firstObs')) {
        if (!this.secondObsReturned) {
          // can use rxjs timer here
        this.timerHandle = setTimeout(() => {
          console.log('Delayed more than 5 seconds');
          this.obsSubcription.unsubscribe();
        }, 5000)
        }
      }
      else {
        // this is the second onservable (which may have come early)
        this.secondObsReturned = true;
      }
    })
  }
}

См. Пример здесь: https://stackblitz.com/edit/angular-s6wkk2


EDIT

Итак, я думал о каком-то способе избежать изменения возвращаемого Observable, и я придумал CombineLatest. С последним объединением в последний раз он будет ожидать значения в обеих наблюдаемых, после чего он будет излучать, даже если какой-либо из наблюдаемых разрешит.

Чтобы использовать это, снова есть ограничение. Например, вам нужно знать конкретное значение, которое Observables никогда не вернет, скажем, false, поэтому, если вы знаете, что Observables никогда не будут возвращаться false (или любое значение по умолчанию), тогда вы можете использовать BehaviorSubjects и combLatest. Инициализируйте объекты BehaviorSubjects значением, которое невозможно вернуть.

Вам нужно будет нажать на наблюдаемое, чтобы добавить значения к предмету.

// give appropriate types
subjectOne = <any> new BehaviorSubject(false); // will contain value of the first observable
subjectTwo = <any> new BehaviorSubject(false); // will contain value of the second observable
takeUntilSub = new Subject(); // use this to stop the subscriptions

obsOne = of('First Obs')
  .pipe(
    tap((value) => {
      this.subjectOne.next(value);
    }),
    catchError((e) => {
      // if an Error occurs in first then you don't want to proceeed at all
      // add an error in the subjectOne, this will stop the combineLatest stream.
      this.subjectOne.error('Observable one errored')
      return throwError;(e)
    })
  )

obsTwo = of('Second Obs')
  .pipe(
    delay(6000),
    tap((value) => {
      this.subjectTwo.next(value);
    }),
    catchError((e) => {
      // if you want to continue the stream, you need to handle the error and return a success.
      // no need to populate the subject coz you don't care about this error
      return of(e)
    })
  )

secondObsReturned = false
timerHandle;

ngOnInit() {

  // calling the actual Observables here.
  merge(this.obsOne, this.obsTwo).pipe(takeUntil(this.takeUntilSub)).subscribe()

  // this will be called once for the very first time giving values as false for both of them (or the emitted initial values)
  // after that when any one of them resolves, flow will come here
  combineLatest(this.subjectOne, this.subjectTwo).pipe(takeUntil(this.takeUntilSub)).subscribe(([dataFromObsOne, dataFromObsTwo]) => {

    console.log(`data received: ${dataFromObsOne} and ${dataFromObsTwo}`)

    if (dataFromObsTwo !== false) {
      // second observable was resolved
      this.secondObsReturned = true;
      if (this.timerHandle) {
        clearTimeout(this.timerHandle);
      }
    }

    if (dataFromObsOne !== false) {
      // first observable resoved
      if (!this.secondObsReturned) {
        // if second obs hasn't already been resolved then start a timer.
        this.timerHandle = setTimeout(() => {
          console.log('Delayed more than 5 seconds');
          this.takeUntilSub.next(true);   // stop all subscriptions
        }, 5000)
      }
    }
  })
}

См. Пример здесь: Код ссылки .

0 голосов
/ 09 мая 2019

Вы можете использовать оператор forkJoin. Этот оператор используется, когда у вас есть несколько запросов, но вам нужно дождаться ответа на первый запрос, а затем может быть запущен следующий запрос.

Дополнительную информацию можно найти в документации RxJS для оператора forkJoin .

Ниже приведен фрагмент, который я попытался собрать:


constructor(private http: HttpClient)

this.url1 = 'Url 1';
this.url2 = 'Url 2';

public forkJoinExample(): Observable<any> {

   let data_1 = this.http.get(this.url1);
   let data_2 = this.http.get(this.url2);

return forkJoin([data1, data2]);

}

Для добавления ожидания вы можете использовать оператор defer . Это должно быть довольно просто, чтобы реализовать отложенное тоже.

Другим способом реализации этого может быть следующий фрагмент:


   let data_1 = this.http.get(this.url1);
   let data_2 = this.http.get(this.url2);

const observableData = Rx.Observable.forkJoin([data1, data2]);

observableData.subscribe(
res => {
   // Handle response
},
(err) => {
   // Handle Error Scenario
},
() => {
   // Executes when all the requests are completed
});

Существует еще один метод для обработки нескольких блоков ошибок, я могу добавить их, если вы хотите, чтобы я добавил. Вы можете просто присоединиться к отдельным вызовам службы и, наконец, подписаться на окончательный запрос и добавить столько блоков ответа и ошибок в эти отдельные запросы.

...