«Эксклюзивность» проще - вы просто заставляете всех остальных подписаться на отфильтрованный вывод эксклюзивного наблюдателя.
«По умолчанию» сложнее - программирование RX является функциональным программированием, и подписчики не знают друг друга, тогда как по определению наличие подписчика «По умолчанию» означает наличие некоторого состояния общего между наблюдателями.Один из способов получить общее состояние - создать очередь производителя / потребителя, используя ConcurrentBag или BufferBlock из TPL DataFlow .Другой способ - присоединить «обработанное» состояние к самому событию, используя такой класс:
public class Handled<T>
{
public bool IsHandled { get; set; }
public T Data { get; set; }
}
В любом случае вам придется дать наблюдателям некоторое время, чтобы они отреагировали, прежде чем использовать «default»."обработчик.Код ниже иллюстрирует понятия «Эксклюзив» и «По умолчанию»:
var source = new[] {0, 1, 2, 3, 4}.ToObservable();
var afterExclusive = source
.Where(x =>
{
if (x == 0)
{
Console.WriteLine("exclusive");
return false;
}
return true;
})
.Select(x => new Handled<int> {Data = x})
.Publish(); // publish is a must otherwise
afterExclusive // we'll get non shared objects
.Do(x => { x.IsHandled = true; })
.Subscribe();
afterExclusive
.Delay(TimeSpan.FromSeconds(1))
.Where(x => !x.IsHandled)
.Subscribe(x => Console.WriteLine("missed by all {0}", x));
afterExclusive.Connect();