Rx подписаться FromEvents объединить все - PullRequest
1 голос
/ 02 июня 2019

Скажем, у меня есть набор символов акций:

StockTicker stockTicker = 
  new StockTicker("MSFT", "APPL", "YHOO", "GOOG");
stockTicker.OnPriceChanged += (sender, args ) => 
{
  Console.WriteLine(
    "{0}: Price - {1}  Volume - {2}", 
    args.StockSymbol, args.Price, args.Volume);
};

Я могу подписаться на события и получить его как hot IObservable:

IObservable<PriceChangedEventArgs> priceChangedObservable = 
  Observable.FromEventPattern<PriceChangedEventArgs>(
    eventHandler => stockTicker.OnPriceChanged += eventHandler,
    eventHandler => stockTicker.OnPriceChanged -= eventHandler )
      .Select( eventPattern => eventPattern.EventArgs );

priceChangedObservable.Subscribe(args => Console.WriteLine( 
  "{0}: Price - {1}  Volume - {2}", 
    args.StockSymbol, args.Price, args.Volume ) );

Это печатает цитату длякаждый символ в виде новой кавычки: В последовательности каждое событие представляет собой одиночную кавычку YHOO 25.33, MSFT 127, AAPL 175, GOOG 1126 и т. д.

Как изменить приведенный выше код так, чтобы каждая «кавычка» представляла собой комбинацию всех текущих кавычеккаждая отдельная цитата?(YHOO 25.33, MSFT 127, GOOG 1126, AAPL 175).«Кавычка» теперь является состоянием последней кавычки, видимой для всех из них, служившей одной «кавычкой».

Я вижу, что Rx имеет оператор Zip, но семантика этого, по-видимому, нуждается вn IObservables, чтобы застегнуть молнию.Здесь есть несколько кавычек, проходящих через одно и то же IObservable?

Таким образом, Console.WriteLine будет печатать все n-кавычки, на которые подписано, как одно событие точки (и печатать только тогда, когда все они имеют значение)

Ответы [ 2 ]

3 голосов
/ 02 июня 2019

Если это случайные события, поступающие в любом порядке, и вас интересует только последнее значение в любое время, то, возможно, вы хотите, чтобы словарь обновлялся с использованием оператора Scan:

source
    /* keep track of the latest value for each symbol */
    .Scan(new Dictionary<string, decimal>(), (a, b) => a[b.StockSymbol] = b.Price)

    /* example logic to wait until they all have values */
    .Where(dict => stockTicker.StockSymbols.All(dict.ContainsKey))

    .Subscribe(dict => { ... });

Если число событий, которые вы хотите сохранить в буфере, остается постоянным и вы знаете, что источник циклически переключается между событиями, оператор Buffer может работать для вас:

source
    /* burst values periodically */
    .Buffer(count)

    .Subscribe(list => { ... }) 
1 голос
/ 02 июня 2019

Не очень удобно думать обо всем с точки зрения каналов передачи данных. Для DynamicData имеется отличная библиотека, которая создает концепцию реактивных коллекций.

Я предполагаю, PriceChangedEventArgs.StockSymbol это string

var cache = new SourceCache<PriceChangedEventArgs,string>(x => x.StockSymbol);
cache.PopulateFrom(priceChangedObservable);

теперь вы можете контролировать все предметы с помощью

cache.Connect()
  .Select(cache.Items)
  .Subscribe({...});

Простым решением является использование Scan, но с помощью реактивных коллекций вы можете применять преобразования, такие как concat, sort, filter и т. Д.

...