Rx. net объединить вложенные потоки по групповому ключу - PullRequest
0 голосов
/ 07 мая 2020

Я новичок в rx. Пытаюсь объединить два потока.

  1. Позиции. Отслеживает набор позиций. Каждая позиция имеет уникальный идентификатор. Размер позиции меняется со временем. Каждая позиция представляет ценную бумагу, но в одной и той же ценной бумаге может быть несколько позиций.
  2. Риск. Отслеживает набор мер риска для ценной бумаги. У каждой ценной бумаги есть уникальный идентификатор, значения мер риска меняются со временем. Меры риска рассчитываются на основе номинального размера позиции 1000.

Я хочу объединить два потока на основе SecurityId, чтобы создать поток риска на уровне позиции. Когда в потоке «Позиции» появляется новая запись, я хочу присоединить к ней самую последнюю запись с тем же идентификатором SecurityId в потоке «Риск». Когда в потоке рисков появляется новая запись, я хочу присоединить к ней самые последние записи в потоке Positions с совпадающим SecurityId (может быть более 1 позиции). Необходимо умножить цену, чтобы отразить фактический размер позиции, а не номинальный. В остальном это просто объединение двух наборов свойств.

Я попытался использовать GroupJoin, но не смог понять, как использовать параметры продолжительности, чтобы выбрать самую последнюю для каждой группы. Я считаю, что достиг желаемого результата с помощью нижеприведенного, но надеюсь, что есть более чистый способ сделать это. В настоящее время я просто генерирую случайные тестовые данные. Вероятно, потребуется протестировать воспроизводимый набор входных данных, чтобы действительно убедиться, что это работает, но результат выглядит правильно.

Использование LINQPad для просмотра вывода, отсюда и .Dump () в конце.

var random = new Random();
// some dummy security names
var securities = new string[]{"ABC", "DEF", "HIF", "JKL", "MNO", "PQRS", "TUV", "WX", "YZ"}; 
// random set of 10 initial positions
var positions = Enumerable.Range(0,10).Select(i => new {PositionId=i, Size=random.Next(1,10)*1000.0, SecurityId=securities[random.Next(0,securities.Length)]}).ToArray() ;
// random set of initial risk records for each security
var risk = Enumerable.Range(0,securities.Length).Select(i => new {SecurityId=securities[i], Price=1.0+random.NextDouble(), NominalSize=1000.0, Duration=random.NextDouble()*10.0+2.0, Yield = random.NextDouble(), Convexity=random.NextDouble()*40.0+4.0}).ToArray() ;

// generate a random position update every half second. Group by SecurityId
var positionsBySec = Observable.Interval (TimeSpan.FromMilliseconds (500)).Select(x=>{ var idx = random.Next(0,10);
                                                                  var size = random.Next(-5,10)*1000.0;
                                                                  var secId = positions[idx].SecurityId;
                                                                  return new {PositionId=idx, Size=size, SecurityId=secId};
                                                                }
                                                            ).GroupBy(p=>p.SecurityId);


// generate a random risk update every half second. Group by SecurityId
var riskBySec = Observable.Interval (TimeSpan.FromMilliseconds (500)).Select(x=>{ var idx = random.Next(0,securities.Length);
                                                                 var secId = securities[idx];
                                                                 var price = 1.0+random.NextDouble();
                                                                 var duration=random.NextDouble()*10.0+2.0;
                                                                 var yield = random.NextDouble();
                                                                 var convexity=random.NextDouble()*40.0+4.0;
                                                                 return new {SecurityId=secId, Price=price*1000.0, NominalSize=1000.0, Duration=duration, Yield=yield, Convexity=convexity};
                                                                }
                                                            ).GroupBy(r =>r.SecurityId);

// View side by side 
//
//Util.HorizontalRun (true,
//
//  positionsBySec,    
//  riskBySec
//).Dump();
//
//

// join positions and risk by security id
positionsBySec.SelectMany(pg => pg.CombineLatest(riskBySec.Where(rg => rg.Key == pg.Key).SelectMany(r=>r), (p, r) => new {PositionId=p.PositionId, Size=p.Size, SecurityId=p.SecurityId, Price=(r.Price*p.Size)/r.NominalSize, Duration=r.Duration, Yield=r.Yield, Convexity=r.Convexity})).Dump();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...