Определить IsAlive на IObservable - PullRequest
2 голосов
/ 28 мая 2011

Я пишу функцию IsAlive, чтобы взять IObservable<T>, интервал времени и вернуть IObservable<bool> Канонический вариант использования - определить, отправляет ли потоковый сервер данные.

Я пришел к следующему решению, но чувствую, что не совсем понятно, как оно работает.

public static IObservable<bool> IsAlive<T>(this IObservable<T> source, 
                                           TimeSpan timeout, 
                                           IScheduler sched)
{
    return source.Window(timeout, sched)
                 .Select(wind => wind.Any())
                 .SelectMany(a => a)
                 .DistinctUntilChanged();
}

У кого-нибудь есть лучший подход?

К вашему сведению -Вот модульные тесты и существующие подходы, которые я попробовал: https://gist.github.com/997003

Ответы [ 2 ]

2 голосов
/ 30 мая 2011

Это должно работать:

public static IObservable<bool> IsAlive<T>(this IObservable<T> source, 
                                           TimeSpan timeout, 
                                           IScheduler sched)
{
    return source.Buffer(timeout, 1, sched)
                 .Select(l => l.Any())
                 .DistinctUntilChanged();
}

Этот подход также имеет смысловой смысл.Каждый раз, когда элемент входит, он заполняет буфер, а затем передается true.И каждый тайм-аут будет создаваться пустой буфер, и будет передаваться false.

Редактировать:

Вот почему подход с буфером-1 лучше, чем оконный:

var sched = new TestScheduler();
var subj = new Subject<Unit>();

var timeout = TimeSpan.FromTicks(10);

subj
    .Buffer(timeout, 1, sched)
    .Select(Enumerable.Any)
    .Subscribe(x => Console.WriteLine("Buffer(timeout, 1): " + x));

subj
    .Window(timeout, sched)
    .Select(wind => wind.Any())
    .SelectMany(a => a)
    .Subscribe(x => Console.WriteLine("Window(timeout): "+x));

sched.AdvanceTo(5);
subj.OnNext(Unit.Default);
sched.AdvanceTo(16);

выход:

Buffer(timeout, 1): True
Window(timeout): True
Buffer(timeout, 1): False

Если говорить точнее, окно открыто на весь тайм-аут и не закрывается и не сбрасывается, как только появляется элемент. Именно здесьв игру вступает ограничение на 1 буфер.Как только элемент входит, буфер и его таймер перезапускаются.

Я мог бы повторно реализовать свой буфер как окно, так как реализация буфера - это окно, но а) я думаю, что буфер имеет смысл лучше семантического и б) мне не нужно SelectMany,Select и SelectMany Скотта могут быть объединены в один SelectMany (x => x.Any ()), но я могу избежать всей лямбды и указать группу методов Enumerable.Any, которая в любом случае будет связываться быстрее (тривиально).

1 голос
/ 28 мая 2011

Как насчет:

source.Select(_ => true)
    .Timeout(timeout, sched)
    .DistinctUntilChanged()
    .Catch<bool, TimeoutException>)(ex => Observable.Return(false));
...