Изменить интервал RX операторов? - PullRequest
6 голосов
/ 08 сентября 2010

Это может быть глупый вопрос, так как я немного новичок в RX:)

Я выбираю событие (RX для .Net 4.0):

eventAsObservable.Sample (TimeSpan.FromSeconds (1)). Timestamp (). Subscribe (x => Console.WriteLine ("testing:" + x.Value.EventArgs.str));

Проблема в том, что время выборки должно быть в состоянии изменить на лету, я думаю, я мог бы создать какое-то свойство, которое удаляет существующий обработчик и создает новый, когда он изменяется, но он кажется немного грязным и более уязвимым к вопросам времени. Есть ли способ просто изменить интервал?

Пример: скажем, что кто-то пишет строку символов, когда при обнаружении определенной последовательности вы хотите изменить время выборки, не пропуская событие, и, желательно, не получая событие более одного раза

Ответы [ 4 ]

8 голосов
/ 09 сентября 2010

Я не знаю, как изменить существующий интервал выборки, но то, что вы могли бы сделать, - это сэмплирование на самой высокой частоте, которая вам нужна, а затем фильтрация с помощью предложения Where использует переменную, которую вы можете изменить.

Например:

static IObservable<T> SampleEvery<T>(this IObservable<T> source,
    Func<int> multipleProvider)
{
    int counter = 0;
    Func<T, bool> predicate = ignored => {
        counter++;
        if (counter >= multipleProvider())
        {
            counter = 0;
        }
        return counter == 0;
    };
    return source.Where(predicate);
}

Тогда вы бы назвали это так:

// Keep this somewhere you can change it
int multiple = 1;

eventAsObservable.Sample(TimeSpan.FromSeconds(1))
                 .SampleEvery(() => multiple)
                 .Timestamp()
                 .Subscribe(x => Console.WriteLine("testing:" + 
                                                   x.Value.EventArgs.str));

Теперь изменение значения multiple изменит эффективную частоту дискретизации.

Это довольно уродливый хак, но я думаю, что это должно сработать.

6 голосов
/ 13 сентября 2010

Я знаю, что на этот вопрос уже был дан ответ, но я подумал, что добавлю еще несколько способов решения этой проблемы Rx.

Вы можете использовать Switch для последовательности TimeSpan s:

private Subject<TimeSpan> sampleFrequencies = new Subject<TimeSpan>();

sampleFrequencies
    .Select(x => eventAsObservable.Sample(Observable.Interval(x)).Timestamp())
    .Switch()
    .Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str));

// To change:
// sampleFrequencies.OnNext(TimeSpan.FromSeconds(5));

В качестве альтернативы, это также может быть решено с использованием Defer, TakeUntil и Repeat (это немного более безумно и включено в качестве упражнения для мысли):

private TimeSpan sampleFrequency = TiemSpan.FromSeconds(2);
private Subject<Unit> frequencyChanged = new Subject<Unit>();

(Observable
    .Defer(() => eventAsObservable
       .Sample(Observable.Interval(sampleFrequency)
    )
    .Timestamp()
    .TakeUntil(frequencyChanged)
).Repeat()
.Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str));

// To change: 
// sampleFrequency = TimeSpan.FromSeconds(5);
// frequencyChanged.OnNext(new Unit());
2 голосов
/ 02 ноября 2015

TL; DR: Создать Observable с помощью ObservableFromIntervalFunctor, как показано ниже:

void Main()
{
    // Pick an initial period, it can be changed later.
    var intervalPeriod = TimeSpan.FromSeconds(1);

    // Create an observable using a functor that captures the interval period.
    var o = ObservableFromIntervalFunctor(() => intervalPeriod);

    // Log every value so we can visualize the observable.
    o.Subscribe(Console.WriteLine);

    // Sleep for a while so you can observe the observable.
    Thread.Sleep(TimeSpan.FromSeconds(5.0));

    // Changing the interval period will takes effect on next tick.
    intervalPeriod = TimeSpan.FromSeconds(0.3);

}

IObservable<long> ObservableFromIntervalFunctor(Func<TimeSpan> intervalPeriodFunctor)
{
    return Observable.Generate(0L, s => true, s => s + 1, s => s, s => intervalPeriodFunctor());
}

Объяснение: Observable.Generate имеет перегрузку, позволяющую вам указать время, когда следующее значение будет сгенерировано через функтор. Передавая функтор, который захватил переменную временного интервала, вы можете изменить период observable.interval, изменив захваченную переменную временного интервала.

фрагмент Linqpad здесь

0 голосов
/ 09 сентября 2010

Почему бы вам просто не подписаться дважды?

Observable.Merge(
    eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Timestamp().SelectMany(x => doLocalLookup(x)),
    eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Timestamp().SelectMany(x => doRemoteLookup(x)),
).Subscribe(Console.WriteLine);

Или, если поиск активен только на основе какого-то префикса или спецификатора, например, в Google Chrome '?'оператор:

Observable.Merge(
    eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Where(x => isLocal(x)).SelectMany(x => doLocalLookup(x)),
    eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Where(x => isARemoteQuery(x).SelectMany(x => doRemoteLookup(x)),
).Subscribe(Console.WriteLine);
...