Rx Extensions: Как сделать подписку зависимой от другой подписки? - PullRequest
0 голосов
/ 16 июля 2011

У меня есть класс, который принимает наблюдаемое в своем конструкторе, затем подписывается на него и выполняет некоторые вещи, устанавливает свойства и т. Д. Сам класс является наблюдаемым.

Я хочу подписаться на свой источник наблюдения, только если кто-то подписан на мой класс, но я не могу понять, как это сделать.

public MyClass : IObservable<MyResult>
{
    private readonly Subject<MyResult> _subject = new Subject<MyResult>();
    private readonly IConnectableObservable<MySource> _source;

    public MyClass(IObservable<MySource> source)
    {
         _source = source
             //All my logic to set properties and such
             //goes here as a side effect, instead of in a subscription...
             .Do(...)
             //I hope that by publishing, side effects will happen only once...
             .Publish();
    }

    public IDisposable Subscribe(IObserver<MyResult> observer)
    {
        return new CompositeDisposable(
             _source.Subscribe(/* 
                  don't have anything to do here,
                  just subscribing to make sure I'm subscribed to source...
                  (this can't be the right way to do it)
             */),
             _subject.Subscribe(observer));
    }
}

UPDATE

@ Скотт: я понимаю, почему реализация IObservable была бы анти-паттерном. My Class должен потреблять одну наблюдаемую и выставлять 3 в качестве свойств (первоначально наиболее часто используемая наблюдаемая должна была быть возвращена самим MyClass, но я думаю, что иметь ее как свойство может быть лучше.

То, что я пытаюсь написать, является наблюдаемой ICommand. Я знаю, что некоторые существуют, но это больше способ выучить Rx ...

public class ObservableCommand<T> : ICommand
{
    private readonly ISubject<T> _executeRequests = new Subject<T>();
    private readonly ISubject<T> _canExecuteRequests = new Subject<T>();

    public IObservable<bool> CanExecuteChanges { get; private set; }
    public IObservable<T> CanExecuteRequests { get; private set; }
    public IObservable<T> ExecuteRequests { get; private set; }

    public ObservableCommand(IObservable<bool> canExecute)
    {
        var source = canExecute.DistinctUntilChanged()

        //How do I dispose of subscription later?
        //I have this fear that I'm going to have a chain of references, 
        //and my entire app will never get GC'd!
        var subscription = source.Subscribe(
            o => {
                if (CanExecuteChanged != null)
                    CanExecuteChanged(this, EventArgs.Empty);
            });

        CanExecuteChanges = source;

        CanExecuteRequests = _canExecuteRequests.AsObservable();

        ExecuteRequests = _executeRequests.AsObservable();
    }

    #region ICommand Members

    public bool  CanExecute(object parameter)
    {
        _canExecuteRequests.OnNext(parameter is T ? (T)parameter : default(T));
    }

    public event EventHandler  CanExecuteChanged;

    public void  Execute(object parameter)
    {
        _executeRequests.OnNext(parameter is T ? (T)parameter : default(T));
    }

    #endregion
}

Ответы [ 2 ]

1 голос
/ 16 июля 2011

Как насчет того, чтобы просто не Do вводить или Publish вводить в конструкторе, а использовать метод Subscribe?

Следует сказать, что явная реализация IObservable<T> является чем-то вроде анти-паттерна Rx.

Вы можете сделать Подписки зависимыми от других подписчиков с помощью Defer и Create, что-то вроде

IObservable<MySource> source;
IObservable<MySource> sourceWithSubSideEffect =  Observable.Defer(() =>
{
   // Do something interesting on Subscription
   // ....
   return source;
});
0 голосов
/ 17 июля 2011

Я приготовил для тебя ножницу. MyClass реализует IObservable<T>, а также имеет методы IObserver<T>, но все они являются закрытыми. С дополнительными OnInitialize и OnSubscribe вы сможете делать все, что захотите, на любое событие, на которое хотите ответить.

Если вы хотите сделать этот отсканированный многоразовым, вы можете определить все методы как partial, так как все они возвращают void. Тогда вы можете создать определение для того, что вы хотите.

public class MyClass<T> : IObservable<T>
{
    private readonly IObservable<T> m_Source;

    public MyClass(IObservable<T> source)
    {
        if (source == null) throw new ArgumentNullException("source");
        m_Source = source.Do(OnNext, OnError, OnCompleted);
        OnInitialize();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        OnSubscribe();
        return m_Source.Subscribe(observer);
    }

    private void OnInitialize()
    {
        Console.WriteLine("OnInitialize");
    }
    private void OnSubscribe()
    {
        Console.WriteLine("OnSubscribe");
    }
    private void OnNext(T value)
    {
        Console.WriteLine("OnNext: {0}", value);
    }
    private void OnError(Exception error)
    {
        Console.WriteLine("OnError: {0}", error.Message);
    }
    private void OnCompleted()
    {
        Console.WriteLine("OnCompleted");
    }    
}
...