Emit only уникальные значения - или если определенный период времени прошел с момента последнего emit - PullRequest
0 голосов
/ 17 марта 2020

Учитывая, что у меня есть IObservable<int> values, который получает эти значения (в указанное время):

Time (secs)  Value
1            4
2            3
3            5
4            5
10           5
11           6

Обновление: столбец времени показан только для демонстрации времени, когда значения получил. Observable содержит только простые значения типа int.

Я хочу вывести значение, если:

  1. отличается от предыдущего значения или
  2. больше прошло более 5 секунд с предыдущего значения

Ожидаемый результат: 4, 3, 5, 5, 6

Так что-то вроде:

void Foo(IObservable<int> values)
{
    ...
    values
        .Something()
        .Do(x => Console.WriteLine(x))
        ...
}

Я чувствую что мне нужна какая-то комбинация GroupByUntilChanged() и Throttle(TimeSpan.FromSeconds(5)), но мой Reactive-fu довольно ржавый ...

Ответы [ 2 ]

2 голосов
/ 17 марта 2020

Сами по себе два условия просты, объединить их немного сложно:

Отличается от предыдущего значения:

source.DistintUntilChanged()

Выводится, если в течение 5 секунд тихо:

source
    .TimeInterval()
    .Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
    .Select(timedT => timedT.Value)

Для объединения я бы применил индексатор, объединил два отдельных решения, затем отфильтровал индексатор для удаления дубликатов, который выглядит следующим образом:

var result = source
    .Select((val, index) => (val, index))           // Add indexer
    .Publish(_indexedSource => Observable.Merge(
        _indexedSource
            .DistinctUntilChanged(t => t.val),

        _indexedSource
            .TimeInterval()
            .Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
            .Select(timedT => timedT.Value)
    ))
    .DistinctUntilChanged(t => t.index)            // Filter out items with same index
    .Select(t => t.val);                           // Remove indexing

Если вы хотите протестировать его, вы необходимо передать планировщик в функцию TimeInterval, которая выглядит следующим образом:

var ts = new TestScheduler();

// The sequence described in question.
var source = ts.CreateHotObservable(
    ReactiveTest.OnNext(1 * TimeSpan.TicksPerSecond, 4),
    ReactiveTest.OnNext(2 * TimeSpan.TicksPerSecond, 3),
    ReactiveTest.OnNext(3 * TimeSpan.TicksPerSecond, 5),
    ReactiveTest.OnNext(4 * TimeSpan.TicksPerSecond, 5),
    ReactiveTest.OnNext(10 * TimeSpan.TicksPerSecond, 5),
    ReactiveTest.OnNext(11 * TimeSpan.TicksPerSecond, 6),
    ReactiveTest.OnCompleted<int>(12 * TimeSpan.TicksPerSecond)
);

// solution
var testResult = source
    .Select((val, index) => (val, index))
    .Publish(_indexedSource => Observable.Merge(
        _indexedSource
            .DistinctUntilChanged(t => t.val),
        _indexedSource
            .TimeInterval(ts)
            .Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
            .Select(timedT => timedT.Value)
    ))
    .DistinctUntilChanged(t => t.index)
    .Select(t => t.val);

// Test assertions
var observer = ts.CreateObserver<int>();
testResult.Subscribe(observer);

var expected = ts.CreateHotObservable(
    ReactiveTest.OnNext(1 * TimeSpan.TicksPerSecond, 4),
    ReactiveTest.OnNext(2 * TimeSpan.TicksPerSecond, 3),
    ReactiveTest.OnNext(3 * TimeSpan.TicksPerSecond, 5),
    ReactiveTest.OnNext(10 * TimeSpan.TicksPerSecond, 5),
    ReactiveTest.OnNext(11 * TimeSpan.TicksPerSecond, 6),
    ReactiveTest.OnCompleted<int>(12 * TimeSpan.TicksPerSecond)
);
ts.Start();
ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);
0 голосов
/ 18 марта 2020

При ближайшем рассмотрении один из моих коллег напомнил мне о силе .Scan () и DistinctUntilChanged () в сочетании. Поэтому для потомков я добавлю и это решение здесь:

var result = source
    .TimeInterval(scheduler)
    .Scan((prev, curr) =>
    {
        if (prev.Value != curr.Value || curr.Interval > TimeSpan.FromSeconds(5))
            return curr;
        else
            return prev;
    })
    .Select(x => x.Value)
    .DistinctUntilChanged();

Reactive Extensions действительно следует старому девизу Perl TIMTOWTDI. ; -)

...