Rx IObservable буферизация для сглаживания всплесков событий - PullRequest
18 голосов
/ 22 декабря 2010

У меня есть наблюдаемая последовательность, которая генерирует события в быстрых пакетах (то есть: пять событий одно за другим, затем длинная задержка, затем еще один быстрый взрыв событий и т. Д.).Я хочу сгладить эти всплески, вставив небольшую задержку между событиями.Представьте в качестве примера следующую диаграмму:

Raw:      --oooo--------------ooooo-----oo----------------ooo|
Buffered: --o--o--o--o--------o--o--o--o--o--o--o---------o--o--o|

Мой текущий подход заключается в создании метрономного таймера с помощью Observable.Interval(), который сигнализирует, когда можно извлечь другое событие из необработанного потока.Проблема в том, что я не могу понять, как затем объединить этот таймер с моей необработанной небуферизованной наблюдаемой последовательностью.

IObservable.Zip() близок к тому, что я хочу, но он работает только до тех пор, пока не обработан потокпроизводит события быстрее, чем таймер.Как только в необработанном потоке наблюдается значительное затишье, таймер создает серию нежелательных событий, которые затем сразу связываются со следующим всплеском событий из необработанного потока.

В идеале, я хочу IObservableметод расширения со следующей сигнатурой функции, которая производит bevaior, который я обрисовал в общих чертах выше.Теперь придите ко мне на помощь StackOverflow:)

public static IObservable<T> Buffered(this IObservable<T> src, TimeSpan minDelay)

PS.Я новичок в Rx, поэтому приношу свои извинения, если это тривиально простой вопрос ...


1.Простой, но некорректный подход

Вот мое первоначальное наивное и упрощенное решение, которое имеет довольно много проблем:

public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
    Queue<T> q = new Queue<T>();
    source.Subscribe(x => q.Enqueue(x));
    return Observable.Interval(minDelay).Where(_ => q.Count > 0).Select(_ => q.Dequeue());
}

Первая очевидная проблема с этим состоит в том, что IDisposable возвращается внутренней подпиской наНеобработанный источник потерян, и поэтому подписка не может быть прекращена.Вызов Dispose для IDisposable, возвращаемого этим методом, убивает таймер, но не базовый канал необработанных событий, который теперь напрасно заполняет очередь, и никому не остается извлекать события из очереди.

Вторая проблема заключается в том, что нетспособ распространения исключений или уведомлений об окончании потока из необработанного потока событий в буферный поток - они просто игнорируются при подписке на необработанный источник.

И последнее, но не менее важное, теперь яу нас есть код, который периодически просыпается независимо от того, есть ли на самом деле какая-либо работа, которую я предпочел бы избегать в этом замечательном новом реактивном мире.


2.Чрезмерно сложный подход

Чтобы решить проблемы, возникшие в моем первоначальном упрощенном подходе, я написал намного более сложную функцию, которая ведет себя почти так же, как IObservable.Delay() (я использовал .NET Reflector, чтобы прочитать этокод и использовал его в качестве основы моей функции).К сожалению, большая часть стандартной логики, такой как AnonymousObservable, не является общедоступной вне кода system.reactive, поэтому мне пришлось скопировать и вставить lot кода.Это решение, кажется, работает, но, учитывая его сложность, я менее уверен, что оно не содержит ошибок.

Я просто не могу поверить, что не существует способа сделать это, используя некоторую комбинацию стандартного Reactiveрасширения.Я ненавижу чувствовать, будто я без необходимости заново изобретаю колесо, и шаблон, который я пытаюсь создать, кажется довольно стандартным.

1 Ответ

10 голосов
/ 22 декабря 2010

На самом деле это дубликат Способ отправки буферизованных событий через четные интервалы , но я приведу здесь краткое изложение (оригинал выглядит довольно странно, поскольку он рассматривает несколько альтернатив).

public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Drain(x => 
        Observable.Empty<int>()
            .Delay(minDelay)
            .StartWith(x)
    );
}

Моя реализация Drain работает как SelectMany, за исключением того, что она ожидает, пока предыдущий вывод завершится первым (вы можете думать о нем как ConactMany, тогда как SelectMany больше похоже на MergeMany). Встроенный Drain не работает таким образом, поэтому вам нужно включить реализацию ниже:

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(
        this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}
...