У меня было точно такое же требование один раз.Я решил включить значение по умолчанию для использования, если никакое значение не срабатывает до первого тайм-аута.Вот версия C #:
public static IObservable<T>
AtLeastEvery<T>(this IObservable<T> source, TimeSpan timeout,
T defaultValue, IScheduler scheduler)
{
if (source == null) throw new ArgumentNullException("source");
if (scheduler == null) throw new ArgumentNullException("scheduler");
return Observable.Create<T>(obs =>
{
ulong id = 0;
var gate = new Object();
var timer = new SerialDisposable();
T lastValue = defaultValue;
Action createTimer = () =>
{
ulong startId = id;
timer.Disposable = scheduler.Schedule(timeout,
self =>
{
bool noChange;
lock (gate)
{
noChange = (id == startId);
if (noChange) obs.OnNext(lastValue);
}
//only restart if no change, otherwise
//the change restarted the timeout
if (noChange) self(timeout);
});
};
//start the first timeout
createTimer();
var subscription = source.Subscribe(
v =>
{
lock (gate)
{
id += 1;
lastValue = v;
}
obs.OnNext(v);
createTimer(); //reset the timeout
},
ex =>
{
lock (gate)
{
id += 1; //'cancel' timeout
}
obs.OnError(ex);
//do not reset the timeout, because the sequence has ended
},
() =>
{
lock (gate)
{
id += 1; //'cancel' timeout
}
obs.OnCompleted();
//do not reset the timeout, because the sequence has ended
});
return new CompositeDisposable(timer, subscription);
});
}
Если вы не хотите каждый раз передавать планировщик, просто сделайте перегрузку, которая выбирает одну по умолчанию и делегирует этот метод.Я использовал Scheduler.ThreadPool
.
. Этот код работает, используя поведение SerialDisposable
, чтобы «отменить» предыдущий вызов тайм-аута, когда приходит новое значение из источника. Есть также счетчик, который я использую в случаетаймер уже истек (в этом случае удаление возврата из расписания не поможет), но метод еще не запущен.
Я исследовал возможности изменения тайм-аутов, но они мне не нужны для решения проблемы, с которой я столкнулся.работа над.Я помню, что это было возможно, но у меня не было под рукой этого кода.