Rx Js оператор, который ведет себя как withLatestFrom, но ожидает значения второго потока - PullRequest
2 голосов
/ 24 апреля 2020

Привет! Я ищу оператор Rx Js, который ведет себя аналогично withLatestFrom, за исключением того, что он будет ждать, пока второй поток выдаст значение, а не пропустит его. Чтобы быть яснее: я хочу выбросы только тогда, когда первый поток излучает новое значение.

Так что вместо:

---A-----B----C-----D-|
------1----------2----|
   withLatestFrom
---------B1---C1----D2|

Я хочу это поведение:

---A-----B----C-----D-|
------1----------2----|
   ?????????????
------A1-B1---C1----D2|

Есть ли для этого оператор?

Ответы [ 2 ]

2 голосов
/ 25 апреля 2020

Я не думаю, что есть оператор, который делает именно это, но вы можете достичь этих результатов, комбинируя оператор отображения высокого порядка и Subject:

second$ = second$.pipe(shareReplay({ bufferSize: 1, refCount: false }));

first$.pipe(
  concatMap(
    firstVal => second$.pipe(
      map(secondVal =>  `${firstVal}${secondVal}`),
      take(1),
    )
  )
)

shareReplay ставит ReplaySubject впереди по данным производителя. Это означает, что он будет отвечать последним N (bufferSize) значениям каждому новому подписчику . refCount гарантирует, что если активных подписчиков больше не будет, ReplaySubject, используемый , не будет уничтожен .

Я решил использовать concatMap, поскольку я думаю, что это безопаснее для ReplaySubject, чтобы иметь только один активный подписчик.

Учитывая эту схему:

---A-----B----C-----D-| first$
------1----------2----| second$

Когда A входит, ReplaySubject (от shareReplay) получит нового подписчика, а A будет ждать, пока не будет получено second$. Когда это произойдет, вы получите A1, и внутренняя наблюдаемая будет завершена (это означает, что ее подписчик будет удален из списка подписчиков ReplaySubject). 1 будет кэшироваться ReplaySubject.

. Затем приходит B, вновь созданный внутренний подписчик подпишется на second$ и немедленно получит 1, в результате чего B1. То же самое с C.

Теперь важная часть: у ReplaySubject не должно быть активных подписчиков, когда он получает новое значение из своего источника, поэтому я выбрал take(1). Когда придет 2, у ReplaySubject не будет активных подписчиков, поэтому ничего не происходит.

Затем прибывает D и получает последнее сохраненное значение, 2, в результате чего D2.

0 голосов
/ 24 апреля 2020

Да, это присоединиться . Из вашего примера не ясно, собираетесь ли вы в левое / правое соединение или во внутреннее соединение.

О, извините. Это действительно понятно. Вам нужна семантика внутреннего соединения (выдает, если данные в обоих соединениях присутствуют и в противном случае ожидают).

...