Мне нужна горячая наблюдаемая, которая оборачивается ценой. Это подписано в нескольких областях.
Наблюдаемое создается с помощью Refcount и передается для подписки. Первые вызовы подписчика подписываются, следовательно, запускает поток и получает все события. Второй будет пропускать события до тех пор, пока не будет подписки, а последующие подписки будут вести себя так же.
Моя проблема не в пропущенных событиях. Я хочу, чтобы все подписчики получали одинаковые данные. То есть поток должен начинаться только после завершения запросов на подписку.
Возможно ли это?
Редактировать: два подхода и его проблемы проиллюстрированы ниже.
1)
public void HotObservableSubscriptionWithRefCount()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();
var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();
var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value {x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2
value {x}"));
Thread.Sleep(TimeSpan.FromSeconds(1));
var combinedSub = obs1.Merge(obs2).Subscribe(x => Console.WriteLine($@"combined
subscriber value {x}"));
Thread.Sleep(TimeSpan.FromSeconds(1));
sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();
Thread.Sleep(TimeSpan.FromSeconds(1));
}
Проблема: у объединенного подписчика отсутствуют значения из двух наблюдаемых из-за задержки в подписке
2)
public void HotObservableSubscriptionWithPublish()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;
}).Publish();
var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish();
var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value
{x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2 value
{x}"));
Thread.Sleep(TimeSpan.FromSeconds(1));
var combinedSub = obs1.Merge(obs2).Subscribe(x =>
Console.WriteLine($@"combined subscriber value {x}"));
obs1.Connect();
obs2.Connect();
Thread.Sleep(TimeSpan.FromSeconds(1));
sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();
Thread.Sleep(TimeSpan.FromSeconds(1));
}
Это гарантирует, что комбинированный подписчик получит значения в соответствии с отдельными подписчиками. Однако даже если абоненты выбрасываются, наблюдатель по-прежнему продолжает предоставлять значения.
Мне нужен полный контроль жизненного цикла издателя и подписчика