Объедините несколько источников событий в один IObservable с Rx - PullRequest
9 голосов
/ 19 ноября 2011

Это вопрос о том, как использовать Reactive Extensions (Rx) в конкретном сценарии, связанном с событием.

  • Цель состоит в том, чтобы взять несколько классов, которые вызывают какое-то событие
  • И объедините их в один IObservable, на который могут подписаться любые клиенты (не подозревающие о классах событий).
  • Обратите внимание на использование интересующих событий в подклассе EventArgs

Некоторые пользовательские EventArgs

public class HappenedEventArgs : EventArgs
{
    internal bool IsBadNotGood;
}

Множество отдельных классов, в которых происходят события

public class EventSourceA : IEventSource {

    public event HappenedEventHandler Happened;
    private void OnHappened(HappenedEventArgs e)
    {
        if (Happened != null)
            Happened(this, e);
    }
    // And then this class calls OnHappened(e) whenever it decides to ...
}

public class EventSourceB : IEventSource {

    public event HappenedEventHandler Happened;
    private void OnHappened(HappenedEventArgs e)
    {
        if (Happened != null)
            Happened(this, e);
    }
    // And then this class also calls OnHappened(e) at times ...
}

public interface IEventSource
{
    event HappenedEventHandler Happened;
}

public delegate void HappenedEventHandler(object sender, HappenedEventArgs e);

Как собрать все эти события и раскрыть единый фронт событий

public class Pooler{

    private IObservable<X> _pool;

    public IObservable<X> Subscribe(){
        return _pool;        
    }

    public void Register(IEventSource item)
    {
        // How to take item.Happened and inject/bind it into _pool here?
    }        

    internal void Unregister(IEventSource item)
    {
        // Disconnect item.Happened from _pool
    }

    public Pooler(){
        // Instantiate _pool to whatever is best?
        // _pool = ...
    }

 }

Абонент, который ничего не знает о источниках событий напрямую

 static void Try() {
     var pooler = new Pooler();
     pooler.Subscribe().Subscribe(e =>
            {
                 // Do something with events here, as they arrive
            }
     );
     // ....
     // Wherever whenever:
     AddEventSources(pooler);
 }

 static void AddEventSources(Pooler pooler){
     var eventSourceA = new EventSourceA();
     pooler.Register(eventSourceA);
     var eventSourceB = new EventSourceB();
     pooler.Register(eventSourceB);     
 }

Ответы [ 2 ]

7 голосов
/ 19 ноября 2011

Библиотека Rx пытается обеспечить способы обработки подобных ситуаций без необходимости создавать группу классов / методов, которые вручную распространяют наблюдаемые.

Допустим, у вас был класс с событием:

public class EventedClass
{
    public event Action<EventArgs> Event;
}

И множество этих экземпляров IEnumerable<EventedClass> objects, Вы можете использовать LINQ для проецирования наблюдаемых из этих классов, комбинировать их с Observable.Merge, что даст вам комбинированный последовательный вывод этих событий.

Observable.Merge(
    objects.Select(
        o => Observable.FromEvent<EventArgs>(
            handler => o.Event += handler,
            handler => o.Event -= handler
        )
)).Subscribe(args => 
{ 
    //do stuff
});
4 голосов
/ 21 ноября 2011

Похоже, вы делаете что-то похожее на этот вопрос .По сути, вы хотите использовать тему в качестве переменной _pool, чтобы она подписывалась и отписывалась от различных источников событий в «Регистрация» и «Отмена регистрации».Чтобы отменить регистрацию источника, вам нужно будет сохранить одноразовые материалы, полученные при звонке в «Регистрация».Кроме того, я хотел бы рассмотреть возможность Pooler реализовать IObservable напрямую и просто переслать Subscribe в переменную _pool.

using System.Reactive.Subjects;
using System.Reactive.Linq;

public class Pooler 
    : IObservable<HappenedEventArgs>, 
      IDisposable
{

    void Dispose()
    {
        if (_pool != null) _pool.Dispose();
        if (_sourceSubs != null)
        {
            foreach (var d in _sourceSubs.Values)
            {
                d.Dispose();
            }
            _sourceSubs.Clear();
        }
    }

    private Subject<HappenedEventArgs> _pool = new Subject<HappenedEventArgs>();
    private Dictionary<IEventSource, IDisposable> _sourceSubs = new Dictionary<IEventSource, IDisposable>();

    public IDisposable Subscribe(IObserver<HappenedEventArgs> observer)
    {
        return _pool.Subscribe(observer);
    }

    public void Register(IEventSource item)
    {
        if (_sourceSubs.ContainsKey(item))
        {
            return; //already registered
        }
        else
        {
            _sourceSubs.Add(item,
                            Observable.FromEventPattern((EventHandler<HappenedEventArgs> h) => item.Happened += h,
                                                        h => item.Happened -= h)
                                      .Select(ep => ep.EventArgs)
                                      .Subscribe(_pool));
        }
    }

    internal void Unregister(IEventSource item)
    {
        IDisposable disp = null;
        if (_sourceSubs.TryGetValue(item, out disp))
        {
            _sourceSubs.Remove(item);
            disp.Dispose();
        }
    }
}

Обратите внимание, что вам потребуется реализовать IDisposable, чтобы гарантировать, что выможет очистить все подписки на события, когда вы закончите с Pooler.

...