Проблема с буфером заключается в том, что интервал «тайм-аут» не сбрасывается при получении нового значения, окна буфера - это просто отрезки времени (в данном случае 5 с), которые следуют друг за другом.Это означает, что в зависимости от того, когда вы получите ваше последнее значение, вам, возможно, придется подождать, почти вдвое превышающее значение тайм-аута.Это также может пропустить тайм-ауты:
should timeout here
v
0s 5s 10s 15s
|x - x - x | x - - - - | - - - x -| ...
true true true
IObservable.Throttle, однако, сбрасывает себя каждый раз, когда приходит новое значение, и создает значение только после истечения интервала времени (последнее входящее значение).Это можно использовать как тайм-аут и объединить с IObservable для вставки значений «тайм-аут» в поток:
var obs = BaseComms.UDPBaseStringListener(localEP)
.Where(msg => msg.Data.Contains("running"));
return obs.Merge( obs.Throttle( TimeSpan.FromSeconds(5) )
.Select( x => false ) )
.DistinctUntilChanged();
Рабочий пример LINQPad:
var sub = new Subject<int>();
var script = sub.Timestamp()
.Merge( sub.Throttle(TimeSpan.FromSeconds(2)).Select( i => -1).Timestamp())
.Subscribe( x =>
{
x.Dump("val");
});
Thread.Sleep(1000);
sub.OnNext(1);
sub.OnNext(2);
Thread.Sleep(10000);
sub.OnNext(5);
A -1 вставленв поток через 2 секунды.