!! предупреждение: Rx новичок !!
У нас есть несколько ценовых каналов. Требуется подписаться на все эти каналы и выводить только последний тик каждые 1 сек (газ)
public static class FeedHandler
{
private static IObservable<PriceTick> _combinedPriceFeed = null;
private static double _throttleFrequency = 1000;
public static void AddToCombinedFeed(IObservable<PriceTick> feed)
{
_combinedPriceFeed = _combinedPriceFeed != null ? _combinedPriceFeed.Merge(feed) : feed;
AddFeed(_combinedPriceFeed);
}
private static IDisposable _subscriber;
private static void AddFeed(IObservable<PriceTick> feed)
{
_subscriber?.Dispose();
_subscriber = feed.Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).Subscribe(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()).ToObservable().Subscribe(NotifyClient));
}
public static void NotifyClient(PriceTick tick)
{
//Do some action
}
}
Код имеет несколько проблем. Если я вызову AddToCombinedFeed с одним и тем же каналом несколько раз, потоки будут дублироваться для начала. Например. ниже
IObservable<PriceTick> feed1;
FeedHandler.AddToCombinedFeed(feed1);//1 stream
FeedHandler.AddToCombinedFeed(feed1);//2 streams(even though the groupby and first() functions will prevent this effect to propagate further
Это подводит меня к вопросу. Если я хочу удалить один поток цен из объединенного потока, как я могу это сделать?