Сами по себе два условия просты, объединить их немного сложно:
Отличается от предыдущего значения:
source.DistintUntilChanged()
Выводится, если в течение 5 секунд тихо:
source
.TimeInterval()
.Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
.Select(timedT => timedT.Value)
Для объединения я бы применил индексатор, объединил два отдельных решения, затем отфильтровал индексатор для удаления дубликатов, который выглядит следующим образом:
var result = source
.Select((val, index) => (val, index)) // Add indexer
.Publish(_indexedSource => Observable.Merge(
_indexedSource
.DistinctUntilChanged(t => t.val),
_indexedSource
.TimeInterval()
.Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
.Select(timedT => timedT.Value)
))
.DistinctUntilChanged(t => t.index) // Filter out items with same index
.Select(t => t.val); // Remove indexing
Если вы хотите протестировать его, вы необходимо передать планировщик в функцию TimeInterval
, которая выглядит следующим образом:
var ts = new TestScheduler();
// The sequence described in question.
var source = ts.CreateHotObservable(
ReactiveTest.OnNext(1 * TimeSpan.TicksPerSecond, 4),
ReactiveTest.OnNext(2 * TimeSpan.TicksPerSecond, 3),
ReactiveTest.OnNext(3 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(4 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(10 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(11 * TimeSpan.TicksPerSecond, 6),
ReactiveTest.OnCompleted<int>(12 * TimeSpan.TicksPerSecond)
);
// solution
var testResult = source
.Select((val, index) => (val, index))
.Publish(_indexedSource => Observable.Merge(
_indexedSource
.DistinctUntilChanged(t => t.val),
_indexedSource
.TimeInterval(ts)
.Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
.Select(timedT => timedT.Value)
))
.DistinctUntilChanged(t => t.index)
.Select(t => t.val);
// Test assertions
var observer = ts.CreateObserver<int>();
testResult.Subscribe(observer);
var expected = ts.CreateHotObservable(
ReactiveTest.OnNext(1 * TimeSpan.TicksPerSecond, 4),
ReactiveTest.OnNext(2 * TimeSpan.TicksPerSecond, 3),
ReactiveTest.OnNext(3 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(10 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(11 * TimeSpan.TicksPerSecond, 6),
ReactiveTest.OnCompleted<int>(12 * TimeSpan.TicksPerSecond)
);
ts.Start();
ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);