Как я могу применить задержку ко всем выбросам из моего ReplaySubject? - PullRequest
0 голосов
/ 26 сентября 2018

Я хочу сделать ReplaySubject в своем приложении Angular, и я хочу настроить его так, чтобы всегда была задержка между тем, когда Наблюдатель подписывается на него и когда он получает обновление.

let delayedReplay = new ReplaySubject(1);

delayedReplay.subscribe((data) => {
  console.log('Got:', data);
});

delayedReplay.next('Test');

Я хочу применить задержку к самому объекту ReplaySubject, чтобы вышеуказанный код регистрировал 'Got: Test', скажем, через 1 секунду.

1 Ответ

0 голосов
/ 26 сентября 2018

Вы должны передать Subject с delay и использовать полученный Observable для подписки, в то время как исходный Subject для излучения.

Это выглядит так:

let replay = new ReplaySubject(1);

let delayedReplay = replay.pipe(delay(1000));

delayedReplay.subscribe((data) => {
  console.log('Got:', data);
});

replay.next('Test');

Это должно работать во всех случаях, но, как указано в этом комментарии , также возможно lift субъекта и приведение результата обратно к субъекту, потому что lift создает экземплярыAnonymousSubject.Это может привести к некоторой дырявой абстракции, поскольку вам необходимо знать реализацию Subject, чтобы убедиться, что приведение типа допустимо.

Если вы хотите использовать его, это решение выглядит следующим образом:

let delayedReplay = <ReplaySubject> new ReplaySubject(1).delay(1000);

// without rxjs-compat in RxJS 6+: 
// let delayedReplay = <ReplaySubject> new ReplaySubject(1).lift(new DelayOperator(1000));

delayedReplay.subscribe((data) => {
    console.log('Got:', data);
});

delayedReplay.next('Test');

Обратите внимание, что lift может быть удалено в RxJS 7.

Теперь, последний вариант - расширение ReplaySubject, поэтому вам не нужно вводить cast.Обратите внимание, что это увеличит связь между вашей реализацией и RxJS и пренебрегает преимуществами составных каналов.

Это может выглядеть так:

class DelayedReplaySubject<T> extends ReplaySubject<T> {
    constructor(buffer: number, private delay: number) {
        super(delay);
    }

    next(value?: T): void {
        of(value)
            .pipe(delay(this.delay))
            .subscribe(val => super.next(val));
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...