Как мне обрабатывать в RxJS подпоследовательные URL-вызовы, которые передают данные друг другу? - PullRequest
3 голосов
/ 02 мая 2019

У меня есть звонок в службу, которая потенциально возвращает мне URL-адрес очереди, если сервер занят обработкой ответа.

Я пишу сервис на угловом языке для обработки вызовов такого типа, и я изо всех сил пытаюсь выяснить, какой оператор RXJS 6+ справится с этим для меня.

Это означает, что я хочу сделать запасной вариант: если ответ вернет мне URL-адрес в очереди, я подпишусь на этот вызов и повторю его, пока не получу ответ.

Время ожидания может составлять до 30 секунд (< insert frustrations here>).

Из того, что я могу сказать на странице документации по rxjs, мне нужно использовать оператор concatMap и каким-то образом retry вызов, пока я не получу правильный ответ? Может быть, с некоторыми delay операторами, чтобы ограничить количество звонков?

Я нашел этот фрагмент из https://www.learnrxjs.io/.

Заранее спасибо!

Ответы [ 2 ]

1 голос
/ 02 мая 2019

Это рекурсивная структура вызовов, поэтому вам нужно написать рекурсивную наблюдаемую.Вы не предоставили точные структуры ответа, поэтому я не могу дать точный код, но на высоком уровне это будет выглядеть так:

getQueuedResponse<T>(url) {
  return this.http.get<T>(url).pipe( // fetch the first URL
    switchMap(res => 
      (res.queueUrl) // if queued (your actual queue indicator may be different)
        ? this.getQueuedResponse<T>(res.queueUrl) //then recurse (your actual next url may be different or it may be the original url again)
        : of(res))); // else break (what you actually return here may be different)
}

вы можете добавить задержку, если хотите, с помощью простоготаймер:

getQueuedResponse<T>(url) {
  return this.http.get<T>(url).pipe( // fetch the first URL
    switchMap(res => 
      (res.queueUrl) // if queued, recurse after 5 seconds
        ? timer(5000).pipe(switchMap(t => this.getQueuedResponse<T>(res.queueUrl))
        : of(res))); // else break
}

В качестве альтернативы, если ваши потребности немного отличаются и вы можете просто вызывать один и тот же URL-адрес снова и снова, вы можете увидеть, что это проблема опроса:

pollForResponse<T>(url) {
  return timer(0, 5000).pipe( // start right away then emit every 5 seconds
    switchMap(i => this.http.get<T>(url)), // request the URL
    takeWhile(r => !!r.queued), // keep taking while it's queued
    last() // only emit the last response
  );
}
0 голосов
/ 02 мая 2019

Я вернусь, чтобы отредактировать это, если мое предположение неверно, но предполагается, что вы получаете полезную нагрузку, которая выглядит примерно так, когда вам нужно перенаправить ваш запрос в очередь:

{
    queue: true
    url: "http://myqueue.com"
}

vs когда вы только что получили ответ:

{
    queue: false
    data: { "foo": "bar" }
}

Способ перенаправления вашего запроса может выглядеть следующим образом

http.get<DataModel>('http://the-original-url').pipe(
    switchMap(dataModel=>{
        //did we get queued?
        if (dataModel.queue) {
            //make a subsequent request and retry it 3 times
            return http.get<QueueResponseModel>(dataModel.url).pipe(retry(3))
        } else {
            //return an observable that emits the desired data
            return of(dataModel.data)
        }
    })
)
...