Возврат одноразового метода подписки для наблюдаемого - PullRequest
0 голосов
/ 25 мая 2018

У меня есть вопрос по Observables (, который я разместил на подфоруме издателей для этой книги, но я все еще жду ответа).

Я использую вспомогательные методы, предоставленные какэто стандартная практика, а не ручная работа наблюдаемых.Однако, просто из академического интереса, я увидел, что нужно для создания наблюдаемого.

Я видел реализацию в книге, в которой в конце метода подписки был возвращен Disposable.Empty.Код выглядит примерно так:

public class MyObservable : IObservable<int>
{
    public IDisposable Subscribe(IObserver<int> observer)
    {
        for (int i = 0; i < 5; i++)
        {
            Thread.Sleep(1000);
            observer.OnNext(i);
        }
        observer.OnCompleted();
        return Disposable.Empty;
    }
}

Если я хочу вернуть правильный Disposable, который на самом деле приведет к отмене подписки, когда вызывается Dispose, как должно быть?

У меня был трескпри этом используя это для Observable и это для Observer

Мне пришлось ввести обработчик подписки

public class SubscriptionHandler : IDisposable
{
    private readonly List<IObserver<int>> _listOfObservers;
    private readonly IObserver<int> _currentObserver;

    public SubscriptionHandler(List<IObserver<int>> currentListOfObservers, IObserver<int> currentObserver)
    {
        _listOfObservers = currentListOfObservers;
        _currentObserver = currentObserver;
    }

    public void Dispose()
    {
        if (_currentObserver != null && _listOfObservers.Contains(_currentObserver))
        {
            _listOfObservers.Remove(_currentObserver);
        }
    }
}

Это код дляНаблюдаемое

public class MyObservable : IObservable<int>
{
    private List<IObserver<int>> _listOfSubscribedObservers = new List<IObserver<int>>();

    public IDisposable Subscribe(IObserver<int> observer)
    {
        if (!_listOfSubscribedObservers.Contains(observer))
        {
            _listOfSubscribedObservers.Add(observer);
        }

        Task.Run(() =>
        {
            for (int i = 0; i < 5; i++)
            {
                Thread.Sleep(1000);
                observer.OnNext(i);
            }

            observer.OnCompleted();
        });

        return new SubscriptionHandler(_listOfSubscribedObservers, observer);
    }
}

У меня такое ощущение, что я что-то упускаю.Должен быть встроенный способ возврата значимого Disposable для Observable, созданного вручную, или это что-то, что поставляется только с Observable методами создания помощника?

PS: Если при голосовании, пожалуйста, укажите причины в комментариях, чтобы я мог избежать повторенияошибки в задании вопроса.

1 Ответ

0 голосов
/ 25 мая 2018

Я должен пояснить, что все это является демонстрацией внутренних элементов Rx-дизайна.Вы можете взглянуть на классы AnonymousObservable<T>, AnonymousObserver<T> и AnonymousDisposable, как это делает фреймворк.Довольно прямо вперед.Тем не менее, вы почти никогда не должны использовать этот код, скорее используйте такие вещи, как Disposable.Create и Observable.Create.Если вы реализуете IObservable, вы почти наверняка делаете это неправильно.

Вот основная идея: наблюдаемой необходимо создать IDisposable, которая удаляет соответствующего наблюдателя из внутреннего списка наблюдаемойнаблюдатели.Ваш код (ошибочно) удаляет всех наблюдателей из внутреннего списка.

Вот базовый одноразовый инструмент, который позволяет легко создавать функционально.С этим кодом GenericDisposable.Create совпадает с Disposable.Create(Action a).

public class GenericDisposable : IDisposable
{
    public static IDisposable Create(Action disposeAction)
    {
        return new GenericDisposable(disposeAction);
    }

    private readonly Action _disposeAction;
    public GenericDisposable(Action disposeAction)
    {
        _disposeAction = disposeAction;
    }
    public void Dispose()
    {
        _disposeAction();
    }
}

... и вот пример наблюдаемой реализации:

public class SendIntMessages : IObservable<int>
{
    private readonly HashSet<IObserver<int>> _observers = new HashSet<IObserver<int>>();

    protected void OnNext(int i)
    {
        foreach (var o in _observers)
            o.OnNext(i);
    }

    protected void OnError(Exception e)
    {
        foreach (var o in _observers)
            o.OnError(e);
    }

    protected void OnCompleted()
    {
        foreach (var o in _observers)
            o.OnCompleted();
    }

    public void SendIntMessage(int i)
    {
        OnNext(i);
    }

    public void EndStream()
    {
        OnCompleted();
    }

    public void SendError(Exception e)
    {
        OnError(e);
    }

    public IDisposable Subscribe(IObserver<int> observer)
    {
        _observers.Add(observer);
        return GenericDisposable.Create(() => _observers.Remove(observer));
    }
}

Это длительное выполнение,горячая наблюдаемая.Он отслеживает своих наблюдателей, и одноразовые отписываются от них.

В отличие от этого наблюдаемого:

public class CountTo5 : IObservable<int>
{
    public IDisposable Subscribe(IObserver<int> observer)
    {
        observer.OnNext(1);
        observer.OnNext(2);
        observer.OnNext(3);
        observer.OnNext(4);
        observer.OnNext(5);

        return GenericDisposable.Create(() => {});
    }
}

Это «холодная» наблюдаемая, которая запускается немедленно.Нет возможности отписаться посередине: к тому времени, когда вы получаете одноразовое, наблюдаемое уже завершено.

Disposable.Empty - это простая короткая рука для DisposableCreate(() => {}).

...