Режимы подписки «Эксклюзив» и «По умолчанию» в Rx - PullRequest
2 голосов
/ 22 июля 2011

У меня есть наблюдаемая последовательность объектов событий и несколько наблюдателей, обрабатывающих определенные типы событий. Мне нужно выполнить следующие сценарии:

  1. Некоторые типы событий должны обрабатываться первым наблюдателем, соответствующим условию (например, observable.SubscribeExclusively (x => {}), и стать "ненаблюдаемыми" для других.
  2. Если подписок нет, установите какой-либо обработчик по умолчанию (например, observable.SubscribeIfNoSubscription (x => {})), чтобы элементы не терялись (этот обработчик может, например, сохранить элемент в базе данных, чтобы он был обработан позже ).

Есть ли способ обработать эти случаи с помощью Rx?

Ответы [ 2 ]

1 голос
/ 03 августа 2011

«Эксклюзивность» проще - вы просто заставляете всех остальных подписаться на отфильтрованный вывод эксклюзивного наблюдателя.

«По умолчанию» сложнее - программирование RX является функциональным программированием, и подписчики не знают друг друга, тогда как по определению наличие подписчика «По умолчанию» означает наличие некоторого состояния общего между наблюдателями.Один из способов получить общее состояние - создать очередь производителя / потребителя, используя ConcurrentBag или BufferBlock из TPL DataFlow .Другой способ - присоединить «обработанное» состояние к самому событию, используя такой класс:

public class Handled<T>
{
    public bool IsHandled { get; set; }
    public T Data { get; set; }
}

В любом случае вам придется дать наблюдателям некоторое время, чтобы они отреагировали, прежде чем использовать «default»."обработчик.Код ниже иллюстрирует понятия «Эксклюзив» и «По умолчанию»:

        var source = new[] {0, 1, 2, 3, 4}.ToObservable();
        var afterExclusive = source
            .Where(x =>
                       {
                           if (x == 0)
                           {
                               Console.WriteLine("exclusive");
                               return false;
                           }
                           return true;
                       })
            .Select(x => new Handled<int> {Data = x})
            .Publish(); // publish is a must otherwise 
        afterExclusive  // we'll get non shared objects
            .Do(x => { x.IsHandled = true; })
            .Subscribe();
        afterExclusive
            .Delay(TimeSpan.FromSeconds(1))
            .Where(x => !x.IsHandled)
            .Subscribe(x => Console.WriteLine("missed by all {0}", x));
        afterExclusive.Connect(); 
0 голосов
/ 23 июля 2011

Я не уверен, что вполне ухмыляюсь по вашему сценарию, но как это вас поразило:

IObservable<Event> streamOfEvents.SelectMany(x => {
    if (matchesExclusiveItem1(x)) {
        x += exclusiveItem1Handler;
        return Observable.Empty<Event>();
    }

    // Default case
    return Observable.Return(x);
}).Subscribe(x => {
    // Default case
    x += defaultHandler;
});

Я использую «Объекты событий», потому что это то, что вы указали, но, вероятно, было бы лучше использовать IObservable<IObservable<T>> - у этого селектора есть побочные эффекты (подключение к событию), что не очень хорошо.

...