То, что у вас есть, хорошо, но как и все RxJS, но дьявол кроется в деталях.
Проблемы
-
switchMap
ing
mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
Здесь вы впервые ошибетесь.Используя здесь карту слияния, вы лишаете возможности сообщать appart «поток запросов» из «потока, возвращенного одним запросом»:
- Вы почти не можете отказаться от подпискииндивидуальный запрос (отменить его)
- Вы делаете невозможным обработку ошибок
- Он падает, если ваша внутренняя наблюдаемая излучает более одного раза.
Скореевсе, что вы хотите - это испускать отдельные BatchEvent
с, через обычные map
(производя наблюдаемые из наблюдаемых), и switchMap
/ mergeMap
те после фильтрации.
Побочные эффекты при создании наблюдаемой и излучающей перед подпиской
userToFetch$.next(userId)
return observable
Не делайте этого.Наблюдаемое само по себе ничего не делает.Это «план» для последовательности действий, которые происходят , когда вы подписываетесь на него.Сделав это, вы создадите пакетное действие только для создания, наблюдаемого, но вы облажаетесь, если вы получаете несколько или отложенные подписки.
Скорее, вы хотите создать наблюдаемое из defer
, которое излучает вuserToFetch$
на каждую подписку.
Даже тогда вы захотите подписаться на свою наблюдаемую до , отправляющую на userToFetch
: если вы не подписаны, ваша наблюдаемая не слушаетпредмет, и событие будет потеряно.Вы можете сделать это в виде, похожем на отложенное.
Решение
Короткое и не сильно отличающееся от вашего кода, но структурируйте его следующим образом.
const BUFFER_TIME = 1000;
type BatchEvent = { keys: Set<string>, values: Observable<Users> };
/** The incomming keys */
const keySubject = new Subject<string>();
const requests: Observable<{ keys: Set<string>, values: Observable<Users> }> =
this.keySubject.asObservable().pipe(
bufferTime(BUFFER_TIME),
map(keys => this.fetchBatch(keys)),
share(),
);
/** Returns a single User from an ID. Batches the request */
function get(userId: string): Observable<User> {
console.log("Creating observable for:", userId);
// The money observable. See "defer":
// triggers a new subject event on subscription
const observable = new Observable<BatchEvent>(observer => {
this.requests.subscribe(observer);
// Emit *after* the subscription
this.keySubject.next(userId);
});
return observable.pipe(
first(v => v.keys.has(userId)),
// There is only 1 item, so any *Map will do here
switchMap(v => v.values),
map(v => v[userId]),
);
}
function fetchBatch(args: string[]): BatchEvent {
const keys = new Set(args); // Do not batch duplicates
const values = this.userService.get(Array.from(keys)).pipe(
share(),
);
return { keys, values };
}
Этоделает именно то, что вы просили, в том числе:
- Ошибки передаются получателям пакетного вызова, но никто другой
- Если все отписываются от пакета, наблюдаемое отменяется
- Если все отписываются от пакета до того, как запрос будет даже запущен, он никогда не запускается
- Наблюдаемое ведет себя как HttpClient: подписывается на наблюдаемые события нового (пакетного) запросадля данных.Абоненты могут свободно
shareReplay
или что-то еще, хотя.Так что никаких сюрпризов там нет.
Вот рабочая строблитц Угловая демонстрация: https://stackblitz.com/edit/angular-rxjs-batch-request
В частности, обратите внимание на поведение, когда вы «переключаете» дисплей: вы заметите, чтоповторная подписка на существующие наблюдаемые вызовет запуск новых пакетных запросов, и эти запросы будут отменены (или сразу не запущены), если вы переключите их достаточно быстро.
Вариант использования
В нашем проектемы используем это для угловых таблиц, где каждая строка должна отдельно выбирать дополнительные данные для рендеринга.Это позволяет нам:
- разбивать все запросы на «одну страницу», не требуя специальных знаний о разбиении на страницы
- Потенциально извлекать сразу несколько страниц, если пользователь быстро разбивает на страницы
- повторно использовать существующие результаты, даже если размер страницы изменяется
Ограничения
Я бы не стал добавлять в это чанкинг или ограничение скорости.Поскольку наблюдаемый источник - тупой bufferTime
, у вас возникают проблемы:
- "Чанкинг" произойдет до дедупликации.Поэтому, если у вас есть 100 запросов на один идентификатор пользователя, вы в конечном итоге вызовете несколько запросов только с одним элементом
- Если вы ограничите скорость, вы не сможете проверить свою очередь.Таким образом, вы можете получить очень длинную очередь, содержащую несколько одинаковых запросов.
Хотя это пессимистическая точка зрения.Исправить это означало бы полное заполнение с помощью механизма очереди / пакета с сохранением состояния, который на порядок сложнее.