Я не думаю, что есть оператор, который делает именно это, но вы можете достичь этих результатов, комбинируя оператор отображения высокого порядка и Subject
:
second$ = second$.pipe(shareReplay({ bufferSize: 1, refCount: false }));
first$.pipe(
concatMap(
firstVal => second$.pipe(
map(secondVal => `${firstVal}${secondVal}`),
take(1),
)
)
)
shareReplay
ставит ReplaySubject
впереди по данным производителя. Это означает, что он будет отвечать последним N
(bufferSize
) значениям каждому новому подписчику . refCount
гарантирует, что если активных подписчиков больше не будет, ReplaySubject
, используемый , не будет уничтожен .
Я решил использовать concatMap
, поскольку я думаю, что это безопаснее для ReplaySubject
, чтобы иметь только один активный подписчик.
Учитывая эту схему:
---A-----B----C-----D-| first$
------1----------2----| second$
Когда A
входит, ReplaySubject
(от shareReplay
) получит нового подписчика, а A
будет ждать, пока не будет получено second$
. Когда это произойдет, вы получите A1
, и внутренняя наблюдаемая будет завершена (это означает, что ее подписчик будет удален из списка подписчиков ReplaySubject
). 1
будет кэшироваться ReplaySubject
.
. Затем приходит B
, вновь созданный внутренний подписчик подпишется на second$
и немедленно получит 1
, в результате чего B1
. То же самое с C
.
Теперь важная часть: у ReplaySubject
не должно быть активных подписчиков, когда он получает новое значение из своего источника, поэтому я выбрал take(1)
. Когда придет 2
, у ReplaySubject
не будет активных подписчиков, поэтому ничего не происходит.
Затем прибывает D
и получает последнее сохраненное значение, 2
, в результате чего D2
.