Распоряжаться внутренней подпиской на слияние - PullRequest
0 голосов
/ 14 ноября 2018

!! предупреждение: Rx новичок !!

У нас есть несколько ценовых каналов. Требуется подписаться на все эти каналы и выводить только последний тик каждые 1 сек (газ)

 public static class FeedHandler
{
        private static IObservable<PriceTick> _combinedPriceFeed = null;

          private static double _throttleFrequency = 1000;

        public static void AddToCombinedFeed(IObservable<PriceTick> feed)
        {
            _combinedPriceFeed = _combinedPriceFeed != null ? _combinedPriceFeed.Merge(feed) : feed;
            AddFeed(_combinedPriceFeed);
        }

              private static IDisposable _subscriber;

        private static void AddFeed(IObservable<PriceTick> feed)
        {
            _subscriber?.Dispose();
            _subscriber = feed.Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).Subscribe(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()).ToObservable().Subscribe(NotifyClient));
        }

         public static void NotifyClient(PriceTick tick)
        {
        //Do some action
        }

}

Код имеет несколько проблем. Если я вызову AddToCombinedFeed с одним и тем же каналом несколько раз, потоки будут дублироваться для начала. Например. ниже

IObservable<PriceTick> feed1;

FeedHandler.AddToCombinedFeed(feed1);//1 stream
FeedHandler.AddToCombinedFeed(feed1);//2 streams(even though the groupby and first() functions will prevent this effect to propagate further

Это подводит меня к вопросу. Если я хочу удалить один поток цен из объединенного потока, как я могу это сделать?

Ответы [ 2 ]

0 голосов
/ 15 марта 2019

Вот код, который вам нужен:

private static SerialDisposable _subscriber = new SerialDisposable();

private static void AddFeed(IObservable<PriceTick> feed)
{
    _subscriber.Disposable =
        feed
            .Buffer(TimeSpan.FromMilliseconds(_throttleFrequency))
            .SelectMany(buffer =>
                buffer
                    .GroupBy(x => x.InstrumentId, (key, result) => result.First()))
            .Subscribe(NotifyClient);
}
0 голосов
/ 15 ноября 2018

Обновление - новое решение

С Dynamic-Data (MIT-лицензия) от RolandPheasant с Nuget.

  1. Использовать SourceList вместо списка
  2. Используйте оператор MergeMany

Код:

public class FeedHandler
{
    private readonly IDisposable _subscriber;
    private readonly SourceList<IObservable<PriceTick>> _feeds = new SourceList<IObservable<PriceTick>>();
    private readonly double _throttleFrequency = 1000;

    public FeedHandler()
    {
        var combinedPriceFeed = _feeds.Connect().MergeMany(x => x).Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
        _subscriber = combinedPriceFeed.Subscribe(NotifyClient);
    }

    public void AddFeed(IObservable<PriceTick> feed) => _feeds.Add(feed);

    public void NotifyClient(PriceTick tick)
    {
        //Do some action
    }
}

Старое решение

  1. Исключите необходимость повторной подписки, применив метод Switch (). Ваш _combinedPriceFeed просто переключается на следующую наблюдаемую будет предоставлено _combinePriceFeedChange.
    1. Вести список для управления несколькими фидами. Создайте новую наблюдаемую при каждом изменении списка и предоставьте ее через _combinePriceFeedChange.
    2. Вы должны получить логику соответствующего метода удаления.

Код:

public class FeedHandler
{
    private readonly IDisposable _subscriber;
    private readonly IObservable<PriceTick> _combinedPriceFeed;
    private readonly List<IObservable<PriceTick>> _feeds = new List<IObservable<PriceTick>>();
    private readonly BehaviorSubject<IObservable<PriceTick>> _combinedPriceFeedChange = new BehaviorSubject<IObservable<PriceTick>>(Observable.Never<PriceTick>());
    private readonly double _throttleFrequency = 1000;

    public FeedHandler()
    {
        _combinedPriceFeed = _combinedPriceFeedChange.Switch().Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
        _subscriber = _combinedPriceFeed.Subscribe(NotifyClient);
    }

    public void AddFeed(IObservable<PriceTick> feed)
    {
        _feeds.Add(feed);
        _combinedPriceFeedChange.OnNext(_feeds.Merge());
    }


    public void NotifyClient(PriceTick tick)
    {
        //Do some action
    }
}
...