Я, наконец, выяснил, в чем здесь проблема.
В сценарии № 1 в приведенном выше вопросе код сначала направляет исходный поток в оператор groupBy
, а затем concatMap
оператор.И эта комбинация операторов, кажется, вызывает эту проблему.
Внутренняя работа groupBy
и mergeMap
Считывание кода для оператора groupBy
Я понял, что groupBy
внутренне создает новый экземпляр Subject
для каждого ключа, найденного в исходном потоке.Все значения, принадлежащие этому ключу, немедленно отправляются этим экземпляром Subject
.
Все экземпляры Subject
заключаются в GroupedObservale
с и отправляются в нисходящем направлении оператором groupBy
.Этот поток GroupedObservable
экземпляров является входом для оператора concatMap
.
Оператор concatMap
внутренне вызывает оператор mergeMap
со значением 1 для concurrency
, что означает только один источникobservable подписывается одновременно.
Оператор mergeMap
подписывается только на одну наблюдаемую или столько наблюдаемых, сколько разрешено параметром conccurency
, и хранит все остальные наблюдаемые в «буфере» до первой.завершено.
Как это создает проблему?
Во-первых, теперь, когда я прочитал код для этих операторов, я не слишком уверен, если это «проблема».
Тем не менее поведение, которое я описал в этом вопросе, происходит потому, что, хотя оператор groupBy
генерирует отдельные значения, используя соответствующий экземпляр Subject
, оператор mergeMap
не подписался бы на этот конкретный Subject
.Следовательно, все значения из исходного потока, которые испускаются с использованием этого Subject
, теряются.
Я попытался проиллюстрировать эту проблему на грубой мраморной диаграмме:
Это не «проблема» с тем, как работают эти операторы, но, возможно, с тем, как я понимал эти операторы и, возможно, с документацией (особенно с документацией для concatMap
, которая может немного сбивать с толку тех, кто не знаком с RxJS).
Это можно легко исправить, если заставить оператор groupBy
использовать ReplaySubject
вместо Subject
для выдачи сгруппированных значений.groupBy
принимает параметр subjectSelector
, который позволяет нам переключать экземпляр Subject
с экземпляром ReplaySubject
.
Работает следующий код:
// THIS VERSION WORKS
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();
const result = clicks.pipe(
groupBy(x => x.substr(2,1), null, null, () => new ReplaySubject()),
concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next(x));
// We also need to explicity complete() the source
// stream to ensure that the observable stream for
// the first GroupedObservable completes allowing
// the concatMap operator to move to the second
// GroupedObservable.
clicks.complete();
// Expected and Actual output
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
Почему сценарий2 работа?
Сценарий 2 в моем вопросе работает нормально, потому что interval
просто создает Observable, но не начинает излучать значения.Следовательно, все значения из этого Observable доступны, когда mergeMap
наконец подписывается на него.