Вы должны передать Subject
с delay
и использовать полученный Observable
для подписки, в то время как исходный Subject
для излучения.
Это выглядит так:
let replay = new ReplaySubject(1);
let delayedReplay = replay.pipe(delay(1000));
delayedReplay.subscribe((data) => {
console.log('Got:', data);
});
replay.next('Test');
Это должно работать во всех случаях, но, как указано в этом комментарии , также возможно lift
субъекта и приведение результата обратно к субъекту, потому что lift
создает экземплярыAnonymousSubject
.Это может привести к некоторой дырявой абстракции, поскольку вам необходимо знать реализацию Subject, чтобы убедиться, что приведение типа допустимо.
Если вы хотите использовать его, это решение выглядит следующим образом:
let delayedReplay = <ReplaySubject> new ReplaySubject(1).delay(1000);
// without rxjs-compat in RxJS 6+:
// let delayedReplay = <ReplaySubject> new ReplaySubject(1).lift(new DelayOperator(1000));
delayedReplay.subscribe((data) => {
console.log('Got:', data);
});
delayedReplay.next('Test');
Обратите внимание, что lift
может быть удалено в RxJS 7.
Теперь, последний вариант - расширение ReplaySubject, поэтому вам не нужно вводить cast.Обратите внимание, что это увеличит связь между вашей реализацией и RxJS и пренебрегает преимуществами составных каналов.
Это может выглядеть так:
class DelayedReplaySubject<T> extends ReplaySubject<T> {
constructor(buffer: number, private delay: number) {
super(delay);
}
next(value?: T): void {
of(value)
.pipe(delay(this.delay))
.subscribe(val => super.next(val));
}
}