Реактивные расширения: дроссель / образец с переменным интервалом - PullRequest
7 голосов
/ 16 августа 2010

У меня есть IObservable, который производит значения через случайные интервалы, и я хочу ограничить эту последовательность. Одна вещь, которую я обнаружил, состоит в том, что определение «регулирования» оператора Throttle не совпадает с моим.

Throttle выдает значения только после истечения указанного интервала с тишиной (выдает последнее увиденное значение). Я думал, что регулирование будет означать создание значений с заданным интервалом (если, конечно, нет молчания).

Скажем, я ожидал, что Observable.Interval(100).Select((_,i) => i).Throttle(200) будет производить (по модулю любые проблемы с производительностью / синхронизацией) четные числа, поскольку я уменьшаю его до "половинной скорости". Однако эта последовательность вообще не дает значения, потому что никогда не бывает периода молчания длиной 200.

Итак, я обнаружил, что Sample на самом деле выполняет "регулирование", которое я хочу. Observable.Interval(100).Select((_,i) => i).Sample(200) создает (опять же, по модулю любые проблемы с производительностью / временем) последовательность четных чисел.

Однако у меня есть еще одна проблема: интервал варьируется в зависимости от последнего «выборочного» значения. Я хочу написать оператор, который выглядит следующим образом:

public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector);

Параметр intervalSelector создает интервал для следующей выборки, и первая выборка ... берется либо по первому значению, либо из дополнительного параметра, мне все равно.

Я пытался написать это, но у меня получилась большая извилистая конструкция, которая работала не совсем правильно. У меня вопрос, могу ли я построить это, используя существующие операторы (иначе, с одной строкой)?

Ответы [ 2 ]

5 голосов
/ 17 августа 2010

Много часов спустя, и немного поспав, я понял.

public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector)
{
    return source.TimeInterval()
                 .Scan(Tuple.Create(TimeSpan.Zero, false, default(T)), (acc, v) =>
                 {
                     if(v.Interval >= acc.Item1)
                     {
                         return Tuple.Create(intervalSelector(v.Value), true, v.Value);
                     }
                     return Tuple.Create(acc.Item1 - v.Interval, false, v.Value);
                 })
                 .Where(t => t.Item2)
                 .Select(x => x.Item3);
}

Это работает, как я хочу: каждый раз, когда он выдает значение x, он прекращает производить значения, пока не пройдет intervalSelector(x) время.

0 голосов
/ 16 августа 2010

Не то, что вы ищете для Observable.BufferWithTime ?

...