Я пытаюсь создать условно-подобный объект BehaviorSubject, постоянное состояние которого зависит от сообщения - некоторые сообщения должны оставаться в очереди, а некоторые - нет.
Этот код также должен печатать Fire N1.
import { filter, map, publish, shareReplay, switchMap, share, multicast } from 'rxjs/operators';
import { Observable, Subject, Subscription, iif, of, ConnectableObservable, ReplaySubject } from 'rxjs';
export interface IMessage {
message: string;
retain: boolean;
}
const messageGenerator = new Subject<IMessage>();
const observer = messageGenerator.asObservable().pipe(
switchMap(m => iif(() => m.retain, of(m).pipe(shareReplay(1)), of(m))),
publish()
) as ConnectableObservable<IMessage>;
observer.connect();
// should go for subscriber 1
messageGenerator.next({message: 'Fire N1', retain: true});
observer.subscribe(ev => console.log('Subscriber 1: Received!', ev.message));
// should go for subscriber 1
messageGenerator.next({message: 'Fire N2', retain: false});
observer.subscribe(ev => console.log('Subscriber 2: Received!', ev.message));
// should go for subscribers 1 & 2
messageGenerator.next({message: 'Fire N3', retain: false});