Как должен работать IObservable <double>. Average? - PullRequest
3 голосов
/ 07 мая 2010

Обновление

Похоже, что Джон Скит был прав (большой сюрприз!), И проблема была в том, что я предположил, что расширение Average обеспечивает непрерывное среднее (это не так).

Для поведения, за которым я следую, я написал простой метод расширения ContinuousAverage, реализацию которого я включил здесь для блага других, которые могут захотеть что-то подобное:

public static class ObservableExtensions {
    private class ContinuousAverager {
            private double _mean;
            private long _count;

        public ContinuousAverager() {
            _mean = 0.0;
            _count = 0L;
        }

        // undecided whether this method needs to be made thread-safe or not
        // seems that ought to be the responsibility of the IObservable (?)
        public double Add(double value) {
            double delta = value - _mean;
            _mean += (delta / (double)(++_count));
            return _mean;
        }
    }

    public static IObservable<double> ContinousAverage(this IObservable<double> source) {
        var averager = new ContinuousAverager();

        return source.Select(x => averager.Add(x));
    }
}

Я думаю о том, чтобы идти вперед и делать что-то подобное выше для других очевидных кандидатов - так, ContinuousCount, ContinuousSum, ContinuousMin, ContinuousMax ... возможно ContinuousVariance иContinuousStandardDeviation а?Есть какие-нибудь мысли по этому поводу?


Оригинальный вопрос

Я использую Rx Extensions немного здесь и там, и чувствую, что у меня есть основные идеи.

Теперь вот что-то странное: у меня сложилось впечатление, что если бы я написал это:

var ticks = Observable.FromEvent<QuoteEventArgs>(MarketDataProvider, "MarketTick");

var bids = ticks
    .Where(e => e.EventArgs.Quote.HasBid)
    .Select(e => e.EventArgs.Quote.Bid);

var bidsSubscription = bids.Subscribe(
    b => Console.WriteLine("Bid: {0}", b)
);

var avgOfBids = bids.Average();
var avgOfBidsSubscription = avgOfBids.Subscribe(
    b => Console.WriteLine("Avg Bid: {0}", b)
);

, я бы получил два IObservable<double> объекта (bids и avgOfBids);один будет в основном потоком всех рыночных заявок от моих MarketDataProvider, другой будет потоком средних этих заявок.

Так что-то вроде этого:

Bid    Avg Bid
1      1
2      1.5
1      1.33
2      1.5

Кажется, мой avgOfBids объект ничего не делает.Что мне не хватает?Я думаю, что, вероятно, неправильно понял, что на самом деле должен делать Average.(Это также относится ко всем методам расширения, подобным агрегатному, в IObservable<T> - например, Max, Count и т. Д.)

Ответы [ 3 ]

3 голосов
/ 09 мая 2010

Еще один способ сделать ContinousAverage - использовать .Scan ():

bids.Scan(new { sum = .0, count = 0 }, 
          (agg, x) => new { sum = agg.sum + x, count = agg.count + 1 })
    .Select(agg => agg.sum / agg.count)
2 голосов
/ 08 мая 2010

Агрегированные методы не предоставляют скользящий макс / мин / счет / среднее и т. Д. - они предоставляют значение single после завершения исходного потока. Например, если вы измените свой код на:

var avgOfBids = bids.Take(5).Average();
var avgOfBidsSubscription = avgOfBids.Subscribe(
    b => Console.WriteLine("Avg Bid: {0}", b)
);

Затем, после того как было подано 5 заявок, будет показано их среднее значение. «Средний» поток также завершится.

0 голосов
/ 08 мая 2010

После подписки на поток событий ставок; вы фактически заблокировали подписчиков (другие подписчики возвращают IDisposable.)

Вы должны будете определить другое Наблюдаемое параллельно с тиками и подписаться на него, чтобы усреднить его.

var ticksToAverage = Observable.FromEvent<QuoteEventArgs>(MarketDataProvider, "MarketTick");

 var bidsToAverage = ticksToAverage
      .Where(e => e.EventArgs.Quote.HasBid)
         .Select(e => e.EventArgs.Quote.Bid);


var avgOfBids = bidsToAverage.Average();
var avgOfBidsSubscription = avgOfBids.Subscribe( b => Console.WriteLine("Avg Bid: {0}", b)
                ); 
...