Тайм-аут Reactive Extensions, который не останавливает последовательность? - PullRequest
5 голосов
/ 09 ноября 2011

Я пытаюсь создать IObservable<bool>, который возвращает true, если UDP-сообщение было получено в течение последних 5 секунд, и если произошел тайм-аут, возвращается false.

Пока у меня есть это:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
    var udp = BaseComms.UDPBaseStringListener(localEP)
        .Where(msg => msg.Data.Contains("running"))
        .Select(s => true);

    return Observable
        .Timeout(udp, TimeSpan.FromSeconds(5))
        .Catch(Observable.Return(false));
}

Проблемы с этим: -

  • Как только возвращается ложь, последовательность останавливается
  • Мне действительно нужно true или false при изменении состояния.

Я мог бы использовать Subject<T>, но мне нужно быть осторожным, чтобы избавиться от наблюдаемой UDPBaseStringListener, когда естьподписчиков больше нет.

Обновление

Каждый раз, когда я получаю сообщение UDP, я хочу, чтобы оно возвращало true.Если я не получил UDP-сообщение в течение последних 5 секунд, я хотел бы, чтобы оно вернуло false.

Ответы [ 4 ]

3 голосов
/ 06 августа 2014

Как указано Bj0 , решение с BufferWithTime не вернет точку данных, как только она получена, и время ожидания буфера не будет сброшено после получения точки данных.

С помощью Rx Extensions 2.0 вы можете решить обе проблемы с новой перегрузкой буфера, принимая время ожидания и размер:

static IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
    return BaseComms
        .UDPBaseStringListener(localEP)
        .Where(msg => msg.Data.Contains("running"))
        .Buffer(TimeSpan.FromSeconds(5), 1)
        .Select(s => s.Count > 0)
        .DistinctUntilChanged();
}
2 голосов
/ 30 октября 2013

Проблема с буфером заключается в том, что интервал «тайм-аут» не сбрасывается при получении нового значения, окна буфера - это просто отрезки времени (в данном случае 5 с), которые следуют друг за другом.Это означает, что в зависимости от того, когда вы получите ваше последнее значение, вам, возможно, придется подождать, почти вдвое превышающее значение тайм-аута.Это также может пропустить тайм-ауты:

               should timeout here
                         v
0s         5s         10s        15s
|x - x - x | x - - - - | - - - x -| ...
          true        true       true

IObservable.Throttle, однако, сбрасывает себя каждый раз, когда приходит новое значение, и создает значение только после истечения интервала времени (последнее входящее значение).Это можно использовать как тайм-аут и объединить с IObservable для вставки значений «тайм-аут» в поток:

var obs = BaseComms.UDPBaseStringListener(localEP)
            .Where(msg => msg.Data.Contains("running"));

return obs.Merge( obs.Throttle( TimeSpan.FromSeconds(5) )
                        .Select( x => false ) )
            .DistinctUntilChanged();

Рабочий пример LINQPad:

var sub = new Subject<int>();

var script = sub.Timestamp()
    .Merge( sub.Throttle(TimeSpan.FromSeconds(2)).Select( i => -1).Timestamp())
    .Subscribe( x => 
{
    x.Dump("val");
});


Thread.Sleep(1000);

sub.OnNext(1);
sub.OnNext(2);

Thread.Sleep(10000);

sub.OnNext(5);

A -1 вставленв поток через 2 секунды.

1 голос
/ 10 ноября 2011

Если вы не хотите, чтобы последовательность остановилась, просто оберните ее в Defer + Repeat:

Observable.Defer(() => GettingUDPMessages(endpoint)
    .Repeat();
1 голос
/ 09 ноября 2011

Я бы рекомендовал избегать использования Timeout - это вызывает исключения, а кодирование с исключениями - плохо.

Кроме того, кажется, имеет смысл только, что ваша наблюдаемая останавливается после одного значения. Возможно, вам придется объяснить больше о том, что вы хотите, чтобы поведение было.

Мое текущее решение вашей проблемы:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
    return Observable.Create<bool>(o =>
    {
        var subject = new AsyncSubject<bool>();
        return new CompositeDisposable(
            Observable.Amb(
                BaseComms
                    .UDPBaseStringListener(localEP)
                    .Where(msg => msg.Data.Contains("running"))
                    .Select(s => true),
                Observable
                    .Timer(TimeSpan.FromMilliseconds(10.0))
                    .Select(_ => false)
            ).Take(1).Subscribe(subject), subject.Subscribe(o));
    });
}

Это помогает?

...