У меня есть последовательность символов акций и последовательность цен акций.Каждый раз, когда я получаю символ акции (он гарантированно будет уникальным), мне нужно охватить окно 100 миллисекунд и обработать цену акции из последовательности цен акций.Если цена акции отсутствует в последовательности в течение этих 100 миллисекунд, мне нужно обработать акцию без цены.
Мраморная диаграмма для лучшей визуализации требования:
Stock : -S1--S2--
Price : ---P1-P2-
Result: -S1---S2P2-
Итак, акция 1
входит, 100 окно миллисекунды перекрывается, но для символа нет цены, поэтомурезультат должен быть просто акция 1
(S1
).
Затем наступает акция 2
, снова включается окно на 100 миллисекунд, и цена акции 2
еще не определена.Однако до закрытия 100-миллисекундного окна мы получаем цену для акций 2
(P2
), следовательно, в результате получается цена 2
с ее ценой (S2P2
).
Возможно, что ценыбудет поступать в случайном порядке, поэтому нельзя делать предположения относительно последовательности цен.
Я видел этот связанный вопрос SO , но не могу заставить его работать.Я пытаюсь использовать GroupJoin
stockSubject
.GroupJoin(
stockPriceSubject,
stock => Observable.Timer(TimeSpan.FromMilliseconds(100)),
price => Observable.Never<Unit>(),
(stock, stockPrice) =>
{
var stockPrices = stockPrice.Where(sprice => sprice.Stock.Equals(stock))
.FirstOrDefaultAsync()
.DefaultIfEmpty();
return (Stock: stock, StockPrices: stockPrices);
})
.Subscribe(async tuple => WriteLine($"{tuple.Stock} | {(await tuple.StockPrices)?.Price ?? 'N'}"));
Это не работает, так как теряет некоторые цены (бывает недетерминированным, поэтому не могу понять, в чем дело).
ДругойПодход, который я пробовал, работая, не выглядит оптимальным
stockSubject
.Subscribe(stock =>
{
stockPriceSubject
.Buffer(TimeSpan.FromMilliseconds(100))
.Take(1)
.Subscribe(bufferedPrices =>
{
var stockPrice = bufferedPrices.FirstOrDefault(price => price.Stock.Equals(stock));
if (stockPrice == null)
{
Console.WriteLine($"{stock} is w/o price");
return;
}
Console.WriteLine($"{stockPrice}");
});
});
Одна вещь, которую мне действительно не нравится в этом, это то, что я оставляю подписку позади каждый раз, когда естьновая акция, когда я подписываюсь на буферизованные цены.
Любая подсказка, каков наилучший способ реализации этого сценария с использованием Rx?
Соответствующие классы для акций и цены на акции
sealed class Stock : IEquatable<Stock>
{
public Stock(string symbol)
{
Symbol = symbol;
}
public string Symbol { get; }
public override string ToString() =>
$"Stock[{Symbol}]";
// IEquatable implementation is emitted for the sake of brevity
}
sealed class StockPrice
{
public StockPrice(Stock stock, decimal price)
{
Stock = stock;
Price = price;
}
public Stock Stock { get; }
public decimal Price { get; }
public override string ToString() =>
$"{Stock} is traded @ {Price}";
}
РЕДАКТИРОВАТЬ добавление генератора кода тестовых данных в соответствии с запросом
Каждые 10 миллисекунд новая акция добавляется в последовательность запасов ( MSFT -> GOOG -> APPL).
Каждые 20 миллисекунд новая цена добавляется в последовательность цен ( APPL -> GOOG ).
Через 1 секундуна складе MSFT подталкивается к последовательности цен.
Ожидаемый результат:
Один раз MSFT передается в последовательность акций, открывается окно с ценой в 100 миллисекунд ... в течение 100 миллисекунд цена для MSFT не передается в последовательность цен, следовательно, MSFT акция должна обрабатываться без цены (в результирующем наборе цена пуста / равна нулю)
После того, как GOOG перемещается в последовательность акций, снова открывается окно 100 миллисекунд, на этот разцена за GOOG акции в течение 100 миллисекунд, следовательно, GOOG акции должны обрабатываться с ценой (15м).
И, наконец, APPL -ожидаемый результат здесь такой же, как с MSFT ..., поскольку цена APPL не была установлена в течение 100 миллисекунд, поскольку она была передана в последовательность акций, ее следует обрабатывать без цены.Здесь тот факт, что ранее была опубликована цена акций APPL , ни на что не должен повлиять.