Как включиться в раздвижные окна - PullRequest
0 голосов
/ 21 октября 2018

У меня есть последовательность символов акций и последовательность цен акций.Каждый раз, когда я получаю символ акции (он гарантированно будет уникальным), мне нужно охватить окно 100 миллисекунд и обработать цену акции из последовательности цен акций.Если цена акции отсутствует в последовательности в течение этих 100 миллисекунд, мне нужно обработать акцию без цены.

Мраморная диаграмма для лучшей визуализации требования:

Stock : -S1--S2--

Price : ---P1-P2-

Result: -S1---S2P2-

Итак, акция 1 входит, 100 окно миллисекунды перекрывается, но для символа нет цены, поэтомурезультат должен быть просто акция 1 (S1).

Затем наступает акция 2, снова включается окно на 100 миллисекунд, и цена акции 2 еще не определена.Однако до закрытия 100-миллисекундного окна мы получаем цену для акций 2 (P2), следовательно, в результате получается цена 2 с ее ценой (S2P2).

Возможно, что ценыбудет поступать в случайном порядке, поэтому нельзя делать предположения относительно последовательности цен.

Я видел этот связанный вопрос SO , но не могу заставить его работать.Я пытаюсь использовать GroupJoin

stockSubject
.GroupJoin(
    stockPriceSubject,
    stock => Observable.Timer(TimeSpan.FromMilliseconds(100)),
    price => Observable.Never<Unit>(),
    (stock, stockPrice) =>
    {            
        var stockPrices = stockPrice.Where(sprice => sprice.Stock.Equals(stock))
                                    .FirstOrDefaultAsync()
                                    .DefaultIfEmpty();
        return (Stock: stock, StockPrices: stockPrices);
    })
.Subscribe(async tuple => WriteLine($"{tuple.Stock} | {(await tuple.StockPrices)?.Price ?? 'N'}"));

Это не работает, так как теряет некоторые цены (бывает недетерминированным, поэтому не могу понять, в чем дело).

ДругойПодход, который я пробовал, работая, не выглядит оптимальным

stockSubject
    .Subscribe(stock =>
    {
        stockPriceSubject
            .Buffer(TimeSpan.FromMilliseconds(100))
            .Take(1)
            .Subscribe(bufferedPrices =>
            {
                var stockPrice = bufferedPrices.FirstOrDefault(price => price.Stock.Equals(stock));
                if (stockPrice == null)
                {
                    Console.WriteLine($"{stock} is w/o price");
                    return;
                }

                Console.WriteLine($"{stockPrice}");
            });
    });

Одна вещь, которую мне действительно не нравится в этом, это то, что я оставляю подписку позади каждый раз, когда естьновая акция, когда я подписываюсь на буферизованные цены.

Любая подсказка, каков наилучший способ реализации этого сценария с использованием Rx?

Соответствующие классы для акций и цены на акции

sealed class Stock : IEquatable<Stock>
{
    public Stock(string symbol)
    {
        Symbol = symbol;
    }

    public string Symbol { get; }

    public override string ToString() =>
        $"Stock[{Symbol}]";

    // IEquatable implementation is emitted for the sake of brevity
}

sealed class StockPrice
{
    public StockPrice(Stock stock, decimal price)
    {
        Stock = stock;
        Price = price;
    }

    public Stock Stock { get; }
    public decimal Price { get; }

    public override string ToString() =>
        $"{Stock} is traded @ {Price}";
}

РЕДАКТИРОВАТЬ добавление генератора кода тестовых данных в соответствии с запросом

Каждые 10 миллисекунд новая акция добавляется в последовательность запасов ( MSFT -> GOOG -> APPL).

Каждые 20 миллисекунд новая цена добавляется в последовательность цен ( APPL -> GOOG ).

Через 1 секундуна складе MSFT подталкивается к последовательности цен.

Ожидаемый результат:

Один раз MSFT передается в последовательность акций, открывается окно с ценой в 100 миллисекунд ... в течение 100 миллисекунд цена для MSFT не передается в последовательность цен, следовательно, MSFT акция должна обрабатываться без цены (в результирующем наборе цена пуста / равна нулю)

После того, как GOOG перемещается в последовательность акций, снова открывается окно 100 миллисекунд, на этот разцена за GOOG акции в течение 100 миллисекунд, следовательно, GOOG акции должны обрабатываться с ценой (15м).

И, наконец, APPL -ожидаемый результат здесь такой же, как с MSFT ..., поскольку цена APPL не была установлена ​​в течение 100 миллисекунд, поскольку она была передана в последовательность акций, ее следует обрабатывать без цены.Здесь тот факт, что ранее была опубликована цена акций APPL , ни на что не должен повлиять.

1 Ответ

0 голосов
/ 22 октября 2018

Невозможно проверить ваш ответ без некоторого тестового кода.Я также не уверен, что вы хотите сделать с данными вниз по течению.Если этого ответа недостаточно, пожалуйста, исправьте вопрос с этой информацией.

Решение того, что вы спрашиваете, я считаю довольно простым:

stocks
    .Select(s => (Stock: s, StockPrices: prices
        .TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(100)))
        .Where(p => p.Stock == s)
    ));

Это приведет к множественномупроблемы с подпиской на prices, поэтому это можно исправить следующим образом:

prices.Publish(_prices => 
    stocks
        .Select(s => (Stock: s, StockPrices: _prices
            .TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(100)))
            .Where(p => p.Stock == s)
        ))
    );

Join и GroupJoin не будут работать так же хорошо, если вы получите 0 цен засклад.Я не рекомендовал бы это для вашего сценария.Однако, если вы вернетесь к нему, вы должны изменить Observable.Never на Observable.Empty.Never оставляет окно цены открытым навсегда, поэтому к старой цене можно присоединиться с новой акцией.


РЕДАКТИРОВАТЬ :

Вот некоторый код тестирования, использующийMicrosoft.Reactive.Testing:

TestScheduler ts = new TestScheduler();
var stockSource = ts.CreateHotObservable<Stock>(
    new Recorded<Notification<Stock>>(10.MsTicks(), Notification.CreateOnNext(new Stock("MSFT"))),
    new Recorded<Notification<Stock>>(20.MsTicks(), Notification.CreateOnNext(new Stock("GOOG"))),
    new Recorded<Notification<Stock>>(30.MsTicks(), Notification.CreateOnNext(new Stock("AAPL")))
);

var priceSource = ts.CreateHotObservable<StockPrice>(
    new Recorded<Notification<StockPrice>>(20.MsTicks(), Notification.CreateOnNext(new StockPrice(new Stock("AAPL"), 10m))),
    new Recorded<Notification<StockPrice>>(40.MsTicks(), Notification.CreateOnNext(new StockPrice(new Stock("GOOG"), 15m)))
);


var target = priceSource.Publish(_prices =>
    stockSource
        .Select(s => (Stock: s, StockPrices: _prices
            .TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(100), ts))
            .Where(p => p.Stock.Symbol == s.Symbol)
        ))
    );
var observer = ts.CreateObserver<(Stock, IObservable<StockPrice>)>();
target.Subscribe(observer);

var target2 = target.SelectMany(t => t.StockPrices.Select(sp => (Stock: t.Stock, Price: sp)));
var observer2 = ts.CreateObserver<(Stock, StockPrice)>();
target2.Subscribe(observer2);
ts.Start();

observer.Messages.Dump();   //LinqPad
observer2.Messages.Dump();  //LinqPad

и с использованием метода расширения:

public static class Extensions
{
    public static long MsTicks(this int i)
    {
        return TimeSpan.FromMilliseconds(i).Ticks;
    }
}

Для меня это работает.Единственной проблемой было отсутствие реализации.Поэтому я переключился с .Where(p => p.Stock == s) на .Where(p => p.Stock.Symbol == s.Symbol).Возможно, это ваша проблема?

...