У меня есть список хостов, которые я хочу изменить при жизни приложения. Я хочу получить ответ от каждого хоста, как только он будет доступен. Если список хостов изменится, мне наплевать на предыдущие ответы от http.get. Я хочу собрать все ответы хостов в одном массиве и подписаться на него.
Проблема, с которой я сталкиваюсь сейчас, заключается в том, что при изменении списка хостов внутренняя наблюдаемая, возвращаемая из http.get, не отписался от. Я хочу, чтобы запрос хоста был отменен, если этот хост требует много времени для возврата, а у внешних хостов $ subject есть следующее значение.
Новый список хостов создается с хостами $ .next ([... ])
const accumulator = (acc, val) => [...acc, val]
hosts$ = new BehaviorSubject(['host1', 'slow2'])
doer = <T>(): Observable<T> =>
this.hosts$.pipe( // take an array of hosts
switchMap(hosts => fromArray(hosts).pipe( // create an array of observable events for each
mergeMap(host => this.http.get(`http://${host}`).pipe( // hit all the urls at the same time
catchError(() => []) // on 404 or 500 just give back an empty array
)),
startWith([]), // subscribers will see an empty list until the first response
scan(accumulator), // every response will be accumulated
shareReplay({
bufferSize: 1, // allow new subscriptions to get the current value
refCount: true // but when the last subscription is removed, start over with []
})
)) // switch map unsubscribes from the previous host list if the host list changes
)
hosts$.subscribe() // here we have the array of responses seen so far as soon
// as they are available
// assuming slow2 is a host that takes forever, and we change the host list...
hosts$.next(['host3']) // here, the get request to slow2 is not cancelled
// the output of the observable in doer() is correct,
// but the get request should have been cancelled
Я пытался отредактировать приведенный выше код, чтобы использовать switchMap вместо mergeMap, чтобы позволить мне отменить предыдущий запрос http.get, но теперь единственные значения, которые поступают, - это последние хосты get.
Как указано в комментариях, мы могли бы добавить новую наблюдаемую информацию, которая представляет «текущую версию списка хостов», и использовать ее в канале http.get, и принять до изменения списка хостов. Теперь каждый раз, когда обновляется список хостов, мы отправляем событие на эту новую наблюдаемую. Это заставляет меня думать о хорошем имени для оператора трубопровода, которого я ищу: obserbale.pipe(takeUntilNext(subject))