Как я могу удалить какой-либо элемент в ReplaySubject в Rxjs? - PullRequest
0 голосов
/ 15 сентября 2018

Справочная информация:

Я пытаюсь написать службу сообщений с Rxjs и angular 6. Я использую ReplaySubject, чтобы сохранить отправленные сообщения, затем подписчик даже после отправленного сообщения может получить их,Таким образом, сообщение может пересекать различные угловые компоненты.

Я успешно реализовал их, однако я столкнулся с проблемой, заключающейся в том, что для каждого нового подписчика будут получены все сообщения в ReplaySubject, я хочу удалить некоторые конкретные сообщения вReplaySubject.или даже очистить все ReplaySubject.

Вопрос:

  • Как удалить какой-либо элемент внутри ReplaySubject?

  • Если невозможно, есть ли какое-нибудь решение?

  • Если невозможно, есть ли способ создать новый ReplaySubject, затем скопируйтеподписчик на новый ReplaySubject?так что все подписчики могут получать сообщения.

Большое спасибо.

Ответы [ 2 ]

0 голосов
/ 16 сентября 2018

Большое спасибо всем за помощь.

Вот что я сделал:

@Injectable()
export class AlertService {

    public static readonly DEFAULT_SOURCE = '_unknown_source_';
    private _message = new ReplaySubject < AlertMessage > ();
    private _deletedMsg = new Array < number > ();

    constructor() {}

    public sendMessage(source: string = AlertService.DEFAULT_SOURCE,
        message: string,
        alertLevel: AlertLevel = AlertLevel.ERROR,
    ): void {
        const alertMessage: AlertMessage = {
            id: Math.random(),
            source: source,
            message: message,
            level: alertLevel
        };
        this._message.next(alertMessage);
    }

    public getMessage(sources: string[] = [AlertService.DEFAULT_SOURCE],
        alertLevels: AlertLevel[] = [AlertLevel.ERROR, AlertLevel.WARNING, AlertLevel.INFO, AlertLevel.SUCCESS],
    ): Observable < AlertMessage > {
        return this._message.asObservable().pipe(
            filter((msg: AlertMessage) => !this._deletedMsg.includes(msg.id)),
            filter((msg: AlertMessage) => sources.includes(msg.source)),
        filter((msg: AlertMessage) => alertLevels.includes(msg.level)),
    );
    }

    public removeMessage(id: number): void {
        this._deletedMsg.push(id);
    }
}

Поэтому я использую _deletedMsg для обхода.Проблема в том, что _deletedMsg будет продолжать расти так же, как и ReplaySubject.Со временем система будет становиться все медленнее.

0 голосов
/ 15 сентября 2018

Существует множество операторов Rxjs.Возможно, вам понадобятся некоторые операторы преобразования или фильтрации .Простой map() тоже может помочь.

НапримерВы можете вернуть тему воспроизведения и использовать asObservable:

replaySubject.asObservable().filter(i => i > 5 ).subscribe();

...