@ Ответ Enigmativity не совсем соответствует спецификации. Это может работать на то, что вы хотите, хотя.
Его ответ определяет 1-секундные окна и берет первое из каждого из этих окон. Это не гарантирует вам ни секунды молчания между предметами. Рассмотрим этот случай:
t : ---------1---------2---------3
source: ------1---2------3---4----5--|
window: ---------|---------|---------|
spec : ------1----------3-----------|
enigma: ------1---2----------4-------|
Ответ подразумевает, что вы хотите одну секунду из ничего после пункта 1. Следующий пункт после этого - 3, с тишиной, затем до конца. Вот этот тест закодирован:
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1100.MsTicks(), 2),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnNext(2200.MsTicks(), 4),
RxTest.OnNext(2600.MsTicks(), 5),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var expectedResults = scheduler.CreateHotObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var target = source
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0), scheduler))
.SelectMany(xs => xs.Take(1));
var observer = scheduler.CreateObserver<int>();
target.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
Я думаю, что лучший способ обойти это - решение на основе Scan
с временными метками. По сути, вы храните последнее законное сообщение в памяти с отметкой времени, а если новое сообщение на одну секунду старше, выведите. В противном случае, не:
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span)
{
return TrueThrottle<T>(source, span, Scheduler.Default);
}
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span, IScheduler scheduler)
{
return source
.Timestamp(scheduler)
.Scan(default(Timestamped<T>), (state, item) => state == default(Timestamped<T>) || item.Timestamp - state.Timestamp > span
? item
: state
)
.DistinctUntilChanged()
.Select(t => t.Value);
}
Примечание. В тестовом коде используется Nuget Microsoft.Reactive.Testing
и следующий вспомогательный класс:
public static class RxTest
{
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
public static Recorded<Notification<T>> OnNext<T>(long msTicks, T t)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnNext(t));
}
public static Recorded<Notification<T>> OnCompleted<T>(long msTicks)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnCompleted<T>());
}
public static Recorded<Notification<T>> OnError<T>(long msTicks, Exception e)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnError<T>(e));
}
}