Я пытаюсь создать два конвейера для каждого символа. Каждый символ будет Buffer
данными, основанными на двух таймфреймах, и будет выполнять DoCalc
на каждом таймфрейме.
priceChangedObservable = Observable.FromEvent<QuoteChangeEvent, IQuote>(handler =>
{
QuoteChangeEvent qHandler = (e) =>
{
handler(e);
};
return qHandler;
},
qHandler => bapi.MAPI.OnQuoteChange += qHandler,
qHandler => bapi.MAPI.OnQuoteChange -= qHandler
);
Если я сделаю следующее:
var els = new EventLoopScheduler();
var dispatcher = new EventLoopScheduler();
var multiCastStream = Observable.Publish(priceChangedObservable);
int timeFrame = 60;
multiCastStream
.GroupBy(instrument => instrument.Symbol)
.SelectMany(q => q)
.Buffer(TimeSpan.FromSeconds(timeFrame))
.Where(messages => messages.Any())
.SubscribeOn(els)
.ObserveOn(dispatcher)
.Select((sr) => DoCalc(sr, timeFrame))
.Subscribe((en) => { if (null != en) Console.WriteLine(en); });
// Start the producer
multiCastStream.Connect();
Все работает так, как я ожидаю. Если я закомментирую приведенный выше код и добавлю второй таймфрейм перед оператором multiCastStream.Connect()
:
int secondTimeFrame = 300;
multiCastStream
.GroupBy(instrument => instrument.Symbol)
.SelectMany(q => q)
.Buffer(TimeSpan.FromSeconds(secondTimeFrame))
.Where(messages => messages.Any())
.SubscribeOn(els)
.ObserveOn(dispatcher)
.Select((sr) => DoCalc(sr, secondTimeFrame))
.Subscribe((en) => { if (null != en) Console.WriteLine(en); });
Это также работает, как и ожидалось. Однако, если у меня работает оба кода, я получаю неожиданное поведение.
Есть ли что-то фундаментальное, что я упускаю, когда делюсь Hot Observables
?
РЕДАКТИРОВАТЬ 1
После изменения кода с ответом от Aron я получаю:
Number of quotes 1
60: 6/20/2019 10:53:26 PM=> M2KU9 Stats.
Number of quotes 1
300: 6/20/2019 10:53:26 PM=> M2KU9 Stats.
Number of quotes 40
60: 6/20/2019 10:54:26 PM=> MNQU9 Stats.
Тогда никакая другая статистика не будет напечатана.