Способ выдвигать буферизованные события в четные интервалы - PullRequest
12 голосов
/ 08 ноября 2010

То, чего я пытаюсь добиться, - это буферизовать входящие события от некоторого IObservable (они приходят пакетами) и высвобождать их дальше, но один за другим, через равные интервалы. Как это:

-oo-ooo-oo------------------oooo-oo-o-------------->

-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->

Так как я совсем новичок в Rx , я не уверен, есть ли уже субъект или оператор, который делает это. Может быть, это можно сделать по композиции?

Обновление:

Благодаря Ричард Сзалай для указания на оператор Drain , я нашел еще один пример Джеймса Майлза об использовании оператора Drain. Вот как мне удалось заставить его работать в приложении WPF:

    .Drain(x => {
        Process(x);
        return Observable.Return(new Unit())
            .Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
    }).Subscribe();

Мне было весело, потому что пропуск параметра планировщика приводит к сбою приложения в режиме отладки без каких-либо исключений (мне нужно узнать, как работать с исключениями в Rx). Метод Process напрямую изменяет состояние пользовательского интерфейса, но я думаю, довольно просто сделать из него IObservable (используя ISubject?).

обновление:

Тем временем я экспериментировал с ISubject, класс ниже делает то, что я хотел - он своевременно выпускает буферизованные Ts:

public class StepSubject<T> : ISubject<T>
{
    IObserver<T> subscriber;
    Queue<T> queue = new Queue<T>();
    MutableDisposable cancel = new MutableDisposable();
    TimeSpan interval;
    IScheduler scheduler;
    bool idle = true;

    public StepSubject(TimeSpan interval, IScheduler scheduler)
    {
        this.interval = interval;
        this.scheduler = scheduler;
    }

    void Step()
    {
        T next;
        lock (queue)
        {
            idle = queue.Count == 0;
            if (!idle)
                next = queue.Dequeue();
        }

        if (!idle)
        {
            cancel.Disposable = scheduler.Schedule(Step, interval);
            subscriber.OnNext(next);
        }
    }

    public void OnNext(T value)
    {
        lock (queue)
            queue.Enqueue(value);

        if (idle)
            cancel.Disposable = scheduler.Schedule(Step);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        subscriber = observer;
        return cancel;
    }
}

Для наивности эта наивная реализация удалена из OnCompleted и OnError, также допускается только одна подписка.

Ответы [ 3 ]

11 голосов
/ 08 ноября 2010

Это на самом деле сложнее, чем кажется.

Использование Delay не работает, потому что значения все равно будут массовыми, только с небольшой задержкой.

Использование Interval с любым из них CombineLatest или Zip не работает, поскольку первое приведет к пропуску исходных значений, а второе - к интервалу значений буфера.

Я думаю, что новый оператор Drain ( добавлен в 1.0.2787.0 ) в сочетании с Delay должны сделать трюк:

source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));

Оператор Drain работает как SelectMany, но ожидает завершения предыдущего вывода, прежде чем вызывать селектор с помощьюследующее значение Это все еще не точно того, что вы ищете (первое значение в блоке также будет отложено), но оно близко: Использование выше соответствует вашей мраморной диаграмме.

Редактировать: Очевидно, Drain в рамках не работает, как SelectMany.Я попрошу несколько советов на официальных форумах.А пока вот реализация Drain, которая делает то, что вам нужно:

Редактировать 09/11: Исправлены ошибки в реализации и обновлено использование в соответствии с запрошенной мраморной диаграммой.

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}
2 голосов
/ 29 ноября 2010

Вот как я это сделал, просто используя явную очередь (ReactiveCollection - это просто модная версия ObservableCollection WPF - ReactiveCollection.ItemsAdded OnNext для каждого добавленного элемента, как вы можете себе представить):

https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ReactiveCollection.cs#L309

public static ReactiveCollection<T> CreateCollection<T>(this IObservable<T> FromObservable, TimeSpan? WithDelay = null)
{
    var ret = new ReactiveCollection<T>();
    if (WithDelay == null) {
        FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add);
        return ret;
    }

    // On a timer, dequeue items from queue if they are available
    var queue = new Queue<T>();
    var disconnect = Observable.Timer(WithDelay.Value, WithDelay.Value)
        .ObserveOn(RxApp.DeferredScheduler).Subscribe(_ => {
            if (queue.Count > 0) { 
                ret.Add(queue.Dequeue());
            }
        });

    // When new items come in from the observable, stuff them in the queue.
    // Using the DeferredScheduler guarantees we'll always access the queue
    // from the same thread.
    FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue);

    // This is a bit clever - keep a running count of the items actually 
    // added and compare them to the final count of items provided by the
    // Observable. Combine the two values, and when they're equal, 
    // disconnect the timer
    ret.ItemsAdded.Scan0(0, ((acc, _) => acc+1)).Zip(FromObservable.Aggregate(0, (acc,_) => acc+1), 
        (l,r) => (l == r)).Where(x => x != false).Subscribe(_ => disconnect.Dispose());

    return ret;
}
2 голосов
/ 10 ноября 2010

Просто для полноты здесь приведена альтернативная (более компактная) версия метода Drain (), предложенная Ричардом:

public static IObservable<T2> SelectManySequential<T1, T2>(
    this IObservable<T1> source, 
    Func<T1, IObservable<T2>> selector
)
{
    return source
        .Select(x => Observable.Defer<T2>(() => selector(x)))
        .Concat();
}

Смотрите тему Drain + SelectMany =? в форуме Rx.

Обновление: Я понял, что перегрузка Concat (), которую я использовал, была одним из моих личных расширений Rx, которые (еще) являются частью фреймворка. Я прошу прощения за эту ошибку .. Конечно, это делает мое решение менее элегантным, чем я думал.

Тем не менее, для полноты я публикую здесь свою перегрузку метода расширения Conact ():

public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
{
    return Observable.CreateWithDisposable<T>(o =>
    {
        var lockCookie = new Object();
        bool completed = false;
        bool subscribed = false;
        var waiting = new Queue<IObservable<T>>();
        var pendingSubscription = new MutableDisposable();

        Action<Exception> errorHandler = e =>
        {
            o.OnError(e);
            pendingSubscription.Dispose();
        };

        Func<IObservable<T>, IDisposable> subscribe = null;
        subscribe = (ob) =>
        {
            subscribed = true;
            return ob.Subscribe(
                o.OnNext,
                errorHandler,
                () =>
                {
                    lock (lockCookie)
                    {
                        if (waiting.Count > 0)
                            pendingSubscription.Disposable = subscribe(waiting.Dequeue());
                        else if (completed)
                            o.OnCompleted();
                        else
                            subscribed = false;
                    }
                }
            );
        };

        return new CompositeDisposable(pendingSubscription,
            source.Subscribe(
                n =>
                {
                    lock (lockCookie)
                    {
                        if (!subscribed)
                            pendingSubscription.Disposable = subscribe(n);
                        else
                            waiting.Enqueue(n);
                    }

                },
                errorHandler
                , () =>
                {
                    lock (lockCookie)
                    {
                        completed = true;
                        if (!subscribed)
                            o.OnCompleted();
                    }
                }
            )
        );
    });
}

А теперь бей себя собственным оружием: Тот же самый метод Concat () мог бы быть написан гораздо более элегантно блестящим способом Ричарда Сзалая:

public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Defer(() =>
    {
        BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
        return source
            .Zip(queue, (v, q) => v)
            .SelectMany(v => 
                v.Do(_ => { }, () => queue.OnNext(new Unit()))
            );
    });
}

Так что кредит принадлежит Ричарду. : -)

...