Как я могу замедлить Observable, не выбрасывая значения в RX? - PullRequest
2 голосов
/ 19 июня 2011

Мой сценарий: У меня есть вычисления, которые должны выполняться примерно раз в секунду. После запуска нужно подождать около 200 мсек, чтобы другие вещи наверстали упущенное. Если вычисление все еще продолжается через секунду, оно должно быть запущено во второй раз, но если программа должна подождать, пока оно не закончится, и начать следующее вычисление через 200 мс после завершения.

То, как я делаю это сейчас:

_refreshFinished = new Subject<bool>();
_autoRefresher = Observable.Interval(TimeSpan.FromMilliseconds(1000))
   .Zip(_refreshFinished, (x,y) => x)
   .Subscribe(x => AutoRefresh(stuff));

Проблема с этим кодом заключается в том, что я не вижу способа отложить задержку после завершения вычислений. Метод Delay задерживает только первый элемент наблюдаемой коллекции. Обычно это правильное поведение один раз, так как вам придется буферизовать бесконечное количество элементов, если вы хотите буферизовать всех, но поскольку задержка вызова Autorefesh на 200 мс задерживает вывод _refreshFinished на 200 мс, не будет никаких накладных расходов , По сути, я хочу Observable, который запускает каждый MaxTime (some_call, 1000 мс), затем задерживается на 200 мс или даже лучше, какое-то динамическое значение. На данный момент меня даже не волнуют значения, которые проходят через это, хотя это может измениться в будущем.

Я открыт для любых предложений

Ответы [ 5 ]

3 голосов
/ 20 июня 2011

Observable.Generate() имеет ряд перегрузок, которые позволят вам динамически регулировать время создания следующего элемента.

Например,

IScheduler schd = Scheduler.TaskPool;
var timeout = TimeSpan.FromSeconds(1);
var shortDelay = TimeSpan.FromMilliseconds(200);
var longerDelay = TimeSpan.FromMilliseconds(500);
Observable.Generate(schd.Now, 
                    time => true, 
                    time => schd.Now, 
                    time => new object(), // your code here
                    time => schd.Now.Subtract(time) > timeout  ? shortDelay : longerDelay ,
                    schd);
1 голос
/ 20 июня 2011

Есть способ сделать это.Это не самая простая вещь, так как время ожидания должно быть динамически рассчитано для каждого значения, но оно работает и довольно универсально.

Когда вы используете этот код, вы можете просто вставить код, который должен вызываться в YOURCODEа все остальное работает автоматически.Ваш код будет в основном вызываться для каждого Max (yourCodeTime + extraDelay, normalCallTime + extraDelay).Это означает, что ваш код не будет вызываться дважды одновременно, и приложение всегда будет иметь дополнительную задержку для выполнения других задач.Если есть какой-то более простой / другой способ сделать это, я бы очень хотел это услышать.

double usualCallTime = 1000;
double extraDealy = 100;
var subject = new Subject<double>();
var subscription =
    sub.TimeInterval()
        .Select(x =>
            {
                var processingTime = x.Interval.TotalMilliseconds - x.Value;
                double timeToWait = 
                     Math.Max(0, usualCallTime - processingTime) + extraDelay;
                return Observable.Timer(TimeSpan.FromMilliseconds(timeToWait))
                    .Select(ignore => timeToWait);
            })
        .Switch()
        .Subscribe(x => {YOURCODE();sub.OnNext(x)});
sub.OnNext(0);

private static void YOURCODE()
{
    // do stuff here
    action.Invoke();
}
1 голос
/ 20 июня 2011

Это больше похоже на работу для новой асинхронной структуры http://msdn.microsoft.com/en-us/vstudio/gg316360

0 голосов
/ 09 марта 2015

Предположим, у вас уже есть 'IObservable', тогда будет работать следующее

var delay = TimeSpan.FromSeconds(1.0);
var actual = source.Scan(
    new ConcurrentQueue<object>(),
    (q, i) =>
        {
            q.Enqueue(i);
            return q;
        }).CombineLatest(
            Observable.Interval(delay),
            (q, t) =>
                {
                    object item;
                    if (q.TryDequeue(out item))
                    {
                        return item;
                    }

                    return null;
                }).Where(v => v != null);

'фактический' - ваш наблюдаемый результат. Но имейте в виду, что приведенный выше код превратил это в наблюдаемую Горячую, если она еще не была горячей. Таким образом, вы не получите OnCompleted.

0 голосов
/ 11 июля 2012

Если я правильно понимаю вашу проблему, у вас есть длительная вычислительная функция, такая как:

static String compute()
{
    int t = 300 + new Random().Next(1000);
    Console.Write("[{0}...", t);
    Thread.Sleep(t);
    Console.Write("]");
    return Guid.NewGuid().ToString();
}

И вы хотите вызывать эту функцию по крайней мере один раз в секунду, но без перекрывающихся вызовов и сминимальное время восстановления 200 мс между вызовами.Приведенный ниже код работает для этой ситуации.

Я начал с более функционального подхода (используя Scan() и Timestamp()), более в стиле Rx - потому что я искал хорошее упражнение Rx -но в конце этот неагрегирующий подход был просто проще.

static void Main()
{
    TimeSpan period = TimeSpan.FromMilliseconds(1000);
    TimeSpan recovery = TimeSpan.FromMilliseconds(200);

    Observable
        .Repeat(Unit.Default)
        .Select(_ =>
        {
            var s = DateTimeOffset.Now;
            var x = compute();
            var delay = period - (DateTimeOffset.Now - s);
            if (delay < recovery)
                delay = recovery;

            Console.Write("+{0} ", (int)delay.TotalMilliseconds);

            return Observable.Return(x).Delay(delay).First();
        })
        .Subscribe(Console.WriteLine);
}

Вот вывод:

[1144...]+200 a7cb5d3d-34b9-4d44-95c9-3e363f518e52
[1183...]+200 359ad966-3be7-4027-8b95-1051e3fb20c2
[831...]+200 f433b4dc-d075-49fe-9c84-b790274982d9
[766...]+219 310c9521-7bee-4acc-bbca-81c706a4632a
[505...]+485 0715abfc-db9b-42e2-9ec7-880d7ff58126
[1244...]+200 30a3002a-924a-4a64-9669-095152906d85
[1284...]+200 e5b1cd79-da73-477c-bca0-0870f4b5c640
[354...]+641 a43c9df5-53e8-4b58-a0df-7561cf4b0483
[1094...]+200 8f25019c-77a0-4507-b05e-c9ab8b34bcc3
[993...]+200 840281bd-c8fd-4627-9324-372636f8dea3

[править: в этом примере используется Rx 2.0 (RC) 2.0.20612,0]

...