есть ли оператор канала, который будет отписываться от внутренней наблюдаемой от внутренних наблюдаемых? - PullRequest
1 голос
/ 08 апреля 2020

У меня есть список хостов, которые я хочу изменить при жизни приложения. Я хочу получить ответ от каждого хоста, как только он будет доступен. Если список хостов изменится, мне наплевать на предыдущие ответы от http.get. Я хочу собрать все ответы хостов в одном массиве и подписаться на него.

Проблема, с которой я сталкиваюсь сейчас, заключается в том, что при изменении списка хостов внутренняя наблюдаемая, возвращаемая из http.get, не отписался от. Я хочу, чтобы запрос хоста был отменен, если этот хост требует много времени для возврата, а у внешних хостов $ subject есть следующее значение.

Новый список хостов создается с хостами $ .next ([... ])


  const accumulator = (acc, val) => [...acc, val]

  hosts$ = new BehaviorSubject(['host1', 'slow2'])

  doer = <T>(): Observable<T> =>
    this.hosts$.pipe( // take an array of hosts
      switchMap(hosts => fromArray(hosts).pipe( // create an array of observable events for each
        mergeMap(host => this.http.get(`http://${host}`).pipe( // hit all the urls at the same time
          catchError(() => []) // on 404 or 500 just give back an empty array
        )),
        startWith([]), // subscribers will see an empty list until the first response
        scan(accumulator), // every response will be accumulated
        shareReplay({
          bufferSize: 1, // allow new subscriptions to get the current value
          refCount: true // but when the last subscription is removed, start over with []
        })
      )) // switch map unsubscribes from the previous host list if the host list changes
    )

  hosts$.subscribe() // here we have the array of responses seen so far as soon
                     // as they are available

  // assuming slow2 is a host that takes forever, and we change the host list...

  hosts$.next(['host3']) // here, the get request to slow2 is not cancelled

  // the output of the observable in doer() is correct,
  //   but the get request should have been cancelled

Я пытался отредактировать приведенный выше код, чтобы использовать switchMap вместо mergeMap, чтобы позволить мне отменить предыдущий запрос http.get, но теперь единственные значения, которые поступают, - это последние хосты get.

Как указано в комментариях, мы могли бы добавить новую наблюдаемую информацию, которая представляет «текущую версию списка хостов», и использовать ее в канале http.get, и принять до изменения списка хостов. Теперь каждый раз, когда обновляется список хостов, мы отправляем событие на эту новую наблюдаемую. Это заставляет меня думать о хорошем имени для оператора трубопровода, которого я ищу: obserbale.pipe(takeUntilNext(subject))

1 Ответ

2 голосов
/ 08 апреля 2020

кажется, что это можно просто очистить следующим образом:

используйте эту наблюдаемую утилиту:

/*
  takes an array of observables and merges them and accumulates emissions in an array as they arrive
*/
const mergeJoin = <T>(obs$: Array<Observable<T>>): Observable<Array<T>> => {
 return merge(...obs$).pipe(
   scan((acc, val) => [...acc, val], [])
 )
}

и сделайте:

  private getHost(host) {
    return this.http.get(`http://${host}`).pipe(
      catchError(() => [])  // is the response an array? is that what you want for your error case?
    )
  }

  doer = <T>(): Observable<T> =>
    this.hosts$.pipe(
      switchMap(hosts => mergeJoin(hosts.map(host => this.getHost(host)))),
      startWith([]),
      shareReplay({bufferSize: 1, refCount: true})
    )

теперь у вас есть doer написано как функция, что является странным решением для наблюдаемой с прикрепленным к ней shareReplay, поскольку она будет создавать новый поток независимо от того, что вызывается при каждой функции, и перезапускать выборку хоста каждый раз, когда она подписана, рендеринг shareReplay не так полезно. Я предполагаю, что у вас есть свои причины, но если это не ваша цель, просто сделайте это c наблюдаемым вместо функции.

...