MaybeSubject: условно применить shareReplay () к внутренней наблюдаемой - PullRequest
0 голосов
/ 10 марта 2020

Я пытаюсь создать условно-подобный объект BehaviorSubject, постоянное состояние которого зависит от сообщения - некоторые сообщения должны оставаться в очереди, а некоторые - нет.

Этот код также должен печатать Fire N1.

import { filter, map, publish, shareReplay, switchMap, share, multicast } from 'rxjs/operators';
import { Observable, Subject, Subscription, iif, of, ConnectableObservable, ReplaySubject } from 'rxjs';


export interface IMessage {
  message: string;
  retain: boolean;
}

 const messageGenerator = new Subject<IMessage>();

 const observer = messageGenerator.asObservable().pipe(
   switchMap(m => iif(() => m.retain, of(m).pipe(shareReplay(1)), of(m))),   
   publish()
) as ConnectableObservable<IMessage>;

observer.connect();

// should go for subscriber 1
messageGenerator.next({message: 'Fire N1', retain: true});

observer.subscribe(ev => console.log('Subscriber 1: Received!', ev.message));

// should go for subscriber 1
messageGenerator.next({message: 'Fire N2', retain: false});

observer.subscribe(ev => console.log('Subscriber 2: Received!', ev.message));

// should go for subscribers 1 & 2
messageGenerator.next({message: 'Fire N3', retain: false});
...