Горячая наблюдаемая с несколькими подписчиками без потери какого-либо события - PullRequest
0 голосов
/ 15 ноября 2018

Мне нужна горячая наблюдаемая, которая оборачивается ценой. Это подписано в нескольких областях.

Наблюдаемое создается с помощью Refcount и передается для подписки. Первые вызовы подписчика подписываются, следовательно, запускает поток и получает все события. Второй будет пропускать события до тех пор, пока не будет подписки, а последующие подписки будут вести себя так же.

Моя проблема не в пропущенных событиях. Я хочу, чтобы все подписчики получали одинаковые данные. То есть поток должен начинаться только после завершения запросов на подписку.

Возможно ли это?

Редактировать: два подхода и его проблемы проиллюстрированы ниже.

1)

public void HotObservableSubscriptionWithRefCount()
{       
    var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
                {
                    var publishVal = x;
                    Console.WriteLine($@"observer1 publishing {publishVal}");
                    return publishVal;
                }).Publish().RefCount();

    var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
                {
                    var publishVal = x + 100;
                    Console.WriteLine($@"observer2 publishing {publishVal}");
                    return publishVal;
                }).Publish().RefCount();

   var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value {x}"));
                var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2 
             value {x}"));

    Thread.Sleep(TimeSpan.FromSeconds(1));

var combinedSub = obs1.Merge(obs2).Subscribe(x => Console.WriteLine($@"combined 
              subscriber value {x}"));

  Thread.Sleep(TimeSpan.FromSeconds(1));

   sub1.Dispose();
   sub2.Dispose();
   combinedSub.Dispose();

   Thread.Sleep(TimeSpan.FromSeconds(1));
}

Проблема: у объединенного подписчика отсутствуют значения из двух наблюдаемых из-за задержки в подписке

2)

 public void HotObservableSubscriptionWithPublish()
{
    var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
                {
                    var publishVal = x;
                    Console.WriteLine($@"observer1 publishing {publishVal}");
                    return publishVal;

                }).Publish();

    var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
                {
                    var publishVal = x + 100;
                    Console.WriteLine($@"observer2 publishing {publishVal}");
                    return publishVal;
                }).Publish();

    var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value 
               {x}"));
    var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2 value 
               {x}"));

             Thread.Sleep(TimeSpan.FromSeconds(1));

    var combinedSub = obs1.Merge(obs2).Subscribe(x => 
                Console.WriteLine($@"combined subscriber value {x}"));

    obs1.Connect();
    obs2.Connect();

   Thread.Sleep(TimeSpan.FromSeconds(1));

   sub1.Dispose();
   sub2.Dispose();
   combinedSub.Dispose();

   Thread.Sleep(TimeSpan.FromSeconds(1));
}

Это гарантирует, что комбинированный подписчик получит значения в соответствии с отдельными подписчиками. Однако даже если абоненты выбрасываются, наблюдатель по-прежнему продолжает предоставлять значения.

Мне нужен полный контроль жизненного цикла издателя и подписчика

...