Это должно работать:
public static IObservable<bool> IsAlive<T>(this IObservable<T> source,
TimeSpan timeout,
IScheduler sched)
{
return source.Buffer(timeout, 1, sched)
.Select(l => l.Any())
.DistinctUntilChanged();
}
Этот подход также имеет смысловой смысл.Каждый раз, когда элемент входит, он заполняет буфер, а затем передается true.И каждый тайм-аут будет создаваться пустой буфер, и будет передаваться false.
Редактировать:
Вот почему подход с буфером-1 лучше, чем оконный:
var sched = new TestScheduler();
var subj = new Subject<Unit>();
var timeout = TimeSpan.FromTicks(10);
subj
.Buffer(timeout, 1, sched)
.Select(Enumerable.Any)
.Subscribe(x => Console.WriteLine("Buffer(timeout, 1): " + x));
subj
.Window(timeout, sched)
.Select(wind => wind.Any())
.SelectMany(a => a)
.Subscribe(x => Console.WriteLine("Window(timeout): "+x));
sched.AdvanceTo(5);
subj.OnNext(Unit.Default);
sched.AdvanceTo(16);
выход:
Buffer(timeout, 1): True
Window(timeout): True
Buffer(timeout, 1): False
Если говорить точнее, окно открыто на весь тайм-аут и не закрывается и не сбрасывается, как только появляется элемент. Именно здесьв игру вступает ограничение на 1 буфер.Как только элемент входит, буфер и его таймер перезапускаются.
Я мог бы повторно реализовать свой буфер как окно, так как реализация буфера - это окно, но а) я думаю, что буфер имеет смысл лучше семантического и б) мне не нужно SelectMany,Select и SelectMany Скотта могут быть объединены в один SelectMany (x => x.Any ()), но я могу избежать всей лямбды и указать группу методов Enumerable.Any, которая в любом случае будет связываться быстрее (тривиально).