Как я должен обнаружить, что наблюдаемая неактивна и вводить данные каждую минуту? - PullRequest
1 голос
/ 21 декабря 2011

У меня есть наблюдаемое возвращение данных спорадически.Если в течение одной минуты нет данных, мне нужно повторять последние данные каждую минуту, пока они не сгенерируют данные снова.Как мне этого добиться?

Спасибо.

Ответы [ 5 ]

2 голосов
/ 22 декабря 2011

Вот один вкладыш, чтобы делать то, что вы хотите.Я проверил это, и это кажется правильным.

var results = source
    .Publish(xs =>
        xs
            .Select(x =>
                Observable
                    .Interval(TimeSpan.FromMinutes(1.0))
                    .Select(_ => x)
                    .StartWith(x))
            .Switch());

Дайте мне знать, если это удастся.

1 голос
/ 21 декабря 2011

Я бы обернул наблюдаемое в наблюдаемое, которое гарантированно будет возвращать значение не реже одного раза в минуту. Оболочка может сделать это, запустив таймер, который перезапускается всякий раз, когда обернутая наблюдаемая возвращает значение.

Таким образом, оболочка возвращает данные всякий раз, когда обернутая наблюдаемая возвращает данные или когда проходит минута после последнего события.

Остальная часть приложения удобно просто наблюдает за оболочкой.

0 голосов
/ 23 августа 2013

Я думаю, что это должно работать по принципу Rx (без рекурсии, но с побочным эффектом):

    public static IObservable<TSource> RepeatLastValueWhenIdle<TSource>(
        this IObservable<TSource> source,
        TimeSpan idleTime,
        TSource defaultValue = default(TSource))
    {
        TSource lastValue = defaultValue;

        return source
            // memorize the last value on each new 
            .Do(ev => lastValue = ev) 
            // re-publish the last value on timeout
            .Timeout(idleTime,  Observable.Return(lastValue))
            // restart waiting for a new value
            .Repeat(); 
    }
0 голосов
/ 21 декабря 2011

У меня было точно такое же требование один раз.Я решил включить значение по умолчанию для использования, если никакое значение не срабатывает до первого тайм-аута.Вот версия C #:

public static IObservable<T> 
AtLeastEvery<T>(this IObservable<T> source, TimeSpan timeout, 
                T defaultValue, IScheduler scheduler)
{
    if (source == null) throw new ArgumentNullException("source");
    if (scheduler == null) throw new ArgumentNullException("scheduler");
    return Observable.Create<T>(obs =>
        {
            ulong id = 0;
            var gate = new Object();
            var timer = new SerialDisposable();
            T lastValue = defaultValue;

            Action createTimer = () =>
                {
                    ulong startId = id;
                    timer.Disposable = scheduler.Schedule(timeout,
                          self =>
                          {
                              bool noChange;
                              lock (gate)
                              {
                                  noChange = (id == startId);
                                  if (noChange) obs.OnNext(lastValue);
                              }
                              //only restart if no change, otherwise
                              //the change restarted the timeout
                              if (noChange) self(timeout);
                          });
                };
            //start the first timeout
            createTimer();
            var subscription = source.Subscribe(
                v =>
                {
                    lock (gate)
                    {
                        id += 1;
                        lastValue = v;
                    }
                    obs.OnNext(v);
                    createTimer(); //reset the timeout
                },
                ex =>
                {
                    lock (gate)
                    {
                        id += 1; //'cancel' timeout
                    }
                    obs.OnError(ex);
                    //do not reset the timeout, because the sequence has ended
                },
                () =>
                {
                    lock (gate)
                    {
                        id += 1; //'cancel' timeout
                    }
                    obs.OnCompleted();
                    //do not reset the timeout, because the sequence has ended
                });

            return new CompositeDisposable(timer, subscription);
        });
}

Если вы не хотите каждый раз передавать планировщик, просто сделайте перегрузку, которая выбирает одну по умолчанию и делегирует этот метод.Я использовал Scheduler.ThreadPool.

. Этот код работает, используя поведение SerialDisposable, чтобы «отменить» предыдущий вызов тайм-аута, когда приходит новое значение из источника. Есть также счетчик, который я использую в случаетаймер уже истек (в этом случае удаление возврата из расписания не поможет), но метод еще не запущен.

Я исследовал возможности изменения тайм-аутов, но они мне не нужны для решения проблемы, с которой я столкнулся.работа над.Я помню, что это было возможно, но у меня не было под рукой этого кода.

0 голосов
/ 21 декабря 2011

Вот (не многопоточная, в случае, если ваш источник многопоточный) реализация оператора RepeatAfterTimeout:

РЕДАКТИРОВАТЬ Обновлен как Тайм-аут не работает, как я ожидал

// Repeats the last value emitted after a timeout 
public static IObservable<TSource> RepeatAfterTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout, IScheduler scheduler)
{
    return Observable.CreateWithDisposable<TSource>(observer =>
    {
        var timer = new MutableDisposable();
        var subscription = new MutableDisposable();

        bool hasValue = false;
        TSource lastValue = default(TSource);

        timer.Disposable = scheduler.Schedule(recurse =>
        {
            if (hasValue)
            {
                observer.OnNext(lastValue);
            }

            recurse();
        });

        subscription.Disposable = source
            .Do(value => { lastValue = value; hasValue = true; })
            .Subscribe(observer);

        return new CompositeDisposable(timer, subscription);
    });
}

public static IObservable<TSource> RepeatAfterTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout)
{
    return source.RepeatAfterTimeout(timeout, Scheduler.TaskPool);
}
...