Реализация наблюдателей и субъектов с использованием IObserver / IObservable - PullRequest
2 голосов
/ 26 сентября 2011

Я хочу создать класс, который можно использовать для представления динамически вычисляемого значения, а другой класс, представляющий значение, может быть источником (субъектом) для этих динамически вычисляемых значений. Цель состоит в том, чтобы при изменении субъекта вычисленное значение обновлялось автоматически.

Мне кажется, что использование IObservable / IObserver - это путь. К сожалению, я не могу использовать библиотеку Reactive Extensions, поэтому я вынужден реализовать шаблон субъекта / наблюдателя с нуля.

Хватит блабла, вот мои занятия:

public class Notifier<T> : IObservable<T>
{
    public Notifier();
    public IDisposable Subscribe(IObserver<T> observer);
    public void Subscribe(Action<T> action);
    public void Notify(T subject);
    public void EndTransmission();
}

public class Observer<T> : IObserver<T>, IDisposable
{
    public Observer(Action<T> action);
    public void Subscribe(Notifier<T> tracker);
    public void Unsubscribe();
    public void OnCompleted();
    public void OnError(Exception error);
    public void OnNext(T value);
    public void Dispose();
}

public class ObservableValue<T> : Notifier<T>
{
    public T Get();
    public void Set(T x);
}

public class ComputedValue<T>
{
    public T Get();
    public void Set(T x);
}

Моя реализация в основном взята из: http://msdn.microsoft.com/en-us/library/dd990377.aspx.

Итак, каким будет "правильный" способ сделать это? Примечание: меня не волнует LINQ, многопоточность или даже производительность. Я просто хочу, чтобы это было просто и легко понять.

1 Ответ

10 голосов
/ 26 сентября 2011

На вашем месте я бы постарался реализовать ваши классы настолько близко, насколько это возможно, к тому, как реализован Rx.

Одним из ключевых базовых принципов является использование относительно небольшого количества конкретных классов, которые объединяются с использованием большого количества операций. Поэтому вы должны создать несколько базовых строительных блоков и использовать композицию, чтобы объединить их все.

В Reflector.NET я хотел бы взглянуть на два класса: AnonymousObservable<T> & AnonymousObserver<T>. В частности, AnonymousObservable<T> используется через Rx как основу для создания наблюдаемых. Фактически, если вы посмотрите на объекты, которые являются производными от IObservable<T>, есть несколько специализированных реализаций, но только AnonymousObservable<T> предназначен для общего назначения.

Статический метод Observable.Create<T>() по сути является оберткой для AnonymousObservable<T>.

Другой класс Rx, который явно соответствует вашим требованиям, - BehaviorSubject<T>. Субъекты являются как наблюдаемыми, так и наблюдателями, и BehaviorSubject соответствует вашей ситуации, поскольку он запоминает последнее полученное значение.

Учитывая эти базовые классы, у вас почти есть все биты, необходимые для создания ваших конкретных объектов. Ваши объекты не должны наследоваться от приведенного выше кода, а вместо этого использовать композицию, чтобы объединить необходимое поведение.

Теперь я хотел бы предложить некоторые изменения в ваших классах, чтобы сделать их более совместимыми с Rx и, следовательно, более совместимыми и надежными.

Я бы бросил ваш Notifier<T> класс в пользу использования BehaviourSubject<T>.

Я бы бросил ваш Observer<T> класс в пользу использования AnonymousObserver<T>.

Тогда я бы изменил ObservableValue<T>, чтобы он выглядел так:

public class ObservableValue<T> : IObservable<T>, IDisposable
{
    public ObservableValue(T initial) { ... }
    public T Value { get; set; }
    public IDisposable Subscribe(IObserver<T> observer);
    public void Dispose();
}

Реализация ObservableValue<T> будет заключать в оболочку BehaviourSubject<T>, а не наследовать ее, поскольку предоставление членов IObserver<T> позволит получить доступ к OnCompleted & OnError, что не будет иметь большого смысла, так как этот класс представляет значение, а не вычисление. Подписки будут использовать AnonymousObservable<T>, а Dispose очистит завернутый BehaviourSubject<T>.

Тогда я бы изменил ComputedValue<T>, чтобы он выглядел так:

public class ComputedValue<T> : IObservable<T>, IDisposable
{
    public ComputedValue(IObservable<T> source) { ... }
    public T Value { get; }
    public IDisposable Subscribe(IObserver<T> observer);
    public void Dispose();
}

Класс ComputedValue<T> обернет AnonymousObservable<T> для всех подписчиков и будет использовать source, чтобы получить локальную копию значений для свойства Value. Метод Dispose будет использоваться для отмены подписки на source наблюдаемый.

Эти два последних класса - единственная реальная конкретная реализация, которая нужна вашему дизайну - и это только из-за свойства Value.

Далее вам нужен статический ObservableValues класс для ваших методов расширения:

public static class ObservableValues
{
    public static ObservableValue<T> Create<T>(T initial)
    { ... }

    public static ComputedValue<V> Compute<T, U, V>(
        this IObservable<T> left,
        IObservable<U> right,
        Func<T, U, V> computation)
    { ... }
}

Метод Compute будет использовать AnonymousObservable<V> для выполнения вычислений и выдаст IObservable<V> для передачи конструктору ComputedValue<V>, который возвращается методом.

Теперь, когда все это доступно, вы можете написать этот код:

var ov1 = ObservableValues.Create(1);
var ov2 = ObservableValues.Create(2);
var ov3 = ObservableValues.Create(3);

var cv1 = ov1.Compute(ov2, (x, y) => x + y);
var cv2 = ov3.Compute(cv1, (x, y) => x * y);

//cv2.Value == 9

ov1.Value = 2;
ov2.Value = 3;
ov3.Value = 4;

//cv2.Value == 20

Пожалуйста, дайте мне знать, если это полезно и / или есть что-то, что я могу уточнить.


РЕДАКТИРОВАТЬ: Также нужны некоторые расходные материалы.

Вам также нужно будет внедрить AnonymousDisposable & CompositeDisposable для управления подписками, особенно в методе расширения Compute. Взгляните на реализации Rx с использованием Reflector.NET или используйте мои версии ниже.

public sealed class AnonymousDisposable : IDisposable
{
    private readonly Action _action;
    private int _disposed;

    public AnonymousDisposable(Action action)
    {
        _action = action;
    }

    public void Dispose()
    {
        if (Interlocked.Exchange(ref _disposed, 1) == 0)
        {
            _action();
        }
    }
}

public sealed class CompositeDisposable : IEnumerable<IDisposable>, IDisposable
{
    private readonly List<IDisposable> _disposables;
    private bool _disposed;

    public CompositeDisposable()
        : this(new IDisposable[] { })
    { }

    public CompositeDisposable(IEnumerable<IDisposable> disposables)
    {
        if (disposables == null) { throw new ArgumentNullException("disposables"); }
        this._disposables = new List<IDisposable>(disposables);
    }

    public CompositeDisposable(params IDisposable[] disposables)
    {
        if (disposables == null) { throw new ArgumentNullException("disposables"); }
        this._disposables = new List<IDisposable>(disposables);
    }

    public void Add(IDisposable disposable)
    {
        if (disposable == null) { throw new ArgumentNullException("disposable"); }
        lock (_disposables)
        {
            if (_disposed)
            {
                disposable.Dispose();
            }
            else
            {
                _disposables.Add(disposable);
            }
        }
    }

    public IDisposable Add(Action action)
    {
        if (action == null) { throw new ArgumentNullException("action"); }
        var disposable = new AnonymousDisposable(action);
        this.Add(disposable);
        return disposable;
    }

    public IDisposable Add<TDelegate>(Action<TDelegate> add, Action<TDelegate> remove, TDelegate handler)
    {
        if (add == null) { throw new ArgumentNullException("add"); }
        if (remove == null) { throw new ArgumentNullException("remove"); }
        if (handler == null) { throw new ArgumentNullException("handler"); }
        add(handler);
        return this.Add(() => remove(handler));
    }

    public void Clear()
    {
        lock (_disposables)
        {
            var disposables = _disposables.ToArray();
            _disposables.Clear();
            Array.ForEach(disposables, d => d.Dispose());
        }
    }

    public void Dispose()
    {
        lock (_disposables)
        {
            if (!_disposed)
            {
                this.Clear();
            }
            _disposed = true;
        }
    }

    public IEnumerator<IDisposable> GetEnumerator()
    {
        lock (_disposables)
        {
            return _disposables.ToArray().AsEnumerable().GetEnumerator();
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }

    public bool IsDisposed
    {
        get
        {
            return _disposed;
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...