Обновление
Похоже, что Джон Скит был прав (большой сюрприз!), И проблема была в том, что я предположил, что расширение Average
обеспечивает непрерывное среднее (это не так).
Для поведения, за которым я следую, я написал простой метод расширения ContinuousAverage
, реализацию которого я включил здесь для блага других, которые могут захотеть что-то подобное:
public static class ObservableExtensions {
private class ContinuousAverager {
private double _mean;
private long _count;
public ContinuousAverager() {
_mean = 0.0;
_count = 0L;
}
// undecided whether this method needs to be made thread-safe or not
// seems that ought to be the responsibility of the IObservable (?)
public double Add(double value) {
double delta = value - _mean;
_mean += (delta / (double)(++_count));
return _mean;
}
}
public static IObservable<double> ContinousAverage(this IObservable<double> source) {
var averager = new ContinuousAverager();
return source.Select(x => averager.Add(x));
}
}
Я думаю о том, чтобы идти вперед и делать что-то подобное выше для других очевидных кандидатов - так, ContinuousCount
, ContinuousSum
, ContinuousMin
, ContinuousMax
... возможно ContinuousVariance
иContinuousStandardDeviation
а?Есть какие-нибудь мысли по этому поводу?
Оригинальный вопрос
Я использую Rx Extensions немного здесь и там, и чувствую, что у меня есть основные идеи.
Теперь вот что-то странное: у меня сложилось впечатление, что если бы я написал это:
var ticks = Observable.FromEvent<QuoteEventArgs>(MarketDataProvider, "MarketTick");
var bids = ticks
.Where(e => e.EventArgs.Quote.HasBid)
.Select(e => e.EventArgs.Quote.Bid);
var bidsSubscription = bids.Subscribe(
b => Console.WriteLine("Bid: {0}", b)
);
var avgOfBids = bids.Average();
var avgOfBidsSubscription = avgOfBids.Subscribe(
b => Console.WriteLine("Avg Bid: {0}", b)
);
, я бы получил два IObservable<double>
объекта (bids
и avgOfBids
);один будет в основном потоком всех рыночных заявок от моих MarketDataProvider
, другой будет потоком средних этих заявок.
Так что-то вроде этого:
Bid Avg Bid
1 1
2 1.5
1 1.33
2 1.5
Кажется, мой avgOfBids
объект ничего не делает.Что мне не хватает?Я думаю, что, вероятно, неправильно понял, что на самом деле должен делать Average
.(Это также относится ко всем методам расширения, подобным агрегатному, в IObservable<T>
- например, Max
, Count
и т. Д.)