На вашем месте я бы постарался реализовать ваши классы настолько близко, насколько это возможно, к тому, как реализован Rx.
Одним из ключевых базовых принципов является использование относительно небольшого количества конкретных классов, которые объединяются с использованием большого количества операций. Поэтому вы должны создать несколько базовых строительных блоков и использовать композицию, чтобы объединить их все.
В Reflector.NET я хотел бы взглянуть на два класса: AnonymousObservable<T>
& AnonymousObserver<T>
. В частности, AnonymousObservable<T>
используется через Rx как основу для создания наблюдаемых. Фактически, если вы посмотрите на объекты, которые являются производными от IObservable<T>
, есть несколько специализированных реализаций, но только AnonymousObservable<T>
предназначен для общего назначения.
Статический метод Observable.Create<T>()
по сути является оберткой для AnonymousObservable<T>
.
Другой класс Rx, который явно соответствует вашим требованиям, - BehaviorSubject<T>
. Субъекты являются как наблюдаемыми, так и наблюдателями, и BehaviorSubject
соответствует вашей ситуации, поскольку он запоминает последнее полученное значение.
Учитывая эти базовые классы, у вас почти есть все биты, необходимые для создания ваших конкретных объектов. Ваши объекты не должны наследоваться от приведенного выше кода, а вместо этого использовать композицию, чтобы объединить необходимое поведение.
Теперь я хотел бы предложить некоторые изменения в ваших классах, чтобы сделать их более совместимыми с Rx и, следовательно, более совместимыми и надежными.
Я бы бросил ваш Notifier<T>
класс в пользу использования BehaviourSubject<T>
.
Я бы бросил ваш Observer<T>
класс в пользу использования AnonymousObserver<T>
.
Тогда я бы изменил ObservableValue<T>
, чтобы он выглядел так:
public class ObservableValue<T> : IObservable<T>, IDisposable
{
public ObservableValue(T initial) { ... }
public T Value { get; set; }
public IDisposable Subscribe(IObserver<T> observer);
public void Dispose();
}
Реализация ObservableValue<T>
будет заключать в оболочку BehaviourSubject<T>
, а не наследовать ее, поскольку предоставление членов IObserver<T>
позволит получить доступ к OnCompleted
& OnError
, что не будет иметь большого смысла, так как этот класс представляет значение, а не вычисление. Подписки будут использовать AnonymousObservable<T>
, а Dispose
очистит завернутый BehaviourSubject<T>
.
Тогда я бы изменил ComputedValue<T>
, чтобы он выглядел так:
public class ComputedValue<T> : IObservable<T>, IDisposable
{
public ComputedValue(IObservable<T> source) { ... }
public T Value { get; }
public IDisposable Subscribe(IObserver<T> observer);
public void Dispose();
}
Класс ComputedValue<T>
обернет AnonymousObservable<T>
для всех подписчиков и будет использовать source
, чтобы получить локальную копию значений для свойства Value
. Метод Dispose
будет использоваться для отмены подписки на source
наблюдаемый.
Эти два последних класса - единственная реальная конкретная реализация, которая нужна вашему дизайну - и это только из-за свойства Value
.
Далее вам нужен статический ObservableValues
класс для ваших методов расширения:
public static class ObservableValues
{
public static ObservableValue<T> Create<T>(T initial)
{ ... }
public static ComputedValue<V> Compute<T, U, V>(
this IObservable<T> left,
IObservable<U> right,
Func<T, U, V> computation)
{ ... }
}
Метод Compute
будет использовать AnonymousObservable<V>
для выполнения вычислений и выдаст IObservable<V>
для передачи конструктору ComputedValue<V>
, который возвращается методом.
Теперь, когда все это доступно, вы можете написать этот код:
var ov1 = ObservableValues.Create(1);
var ov2 = ObservableValues.Create(2);
var ov3 = ObservableValues.Create(3);
var cv1 = ov1.Compute(ov2, (x, y) => x + y);
var cv2 = ov3.Compute(cv1, (x, y) => x * y);
//cv2.Value == 9
ov1.Value = 2;
ov2.Value = 3;
ov3.Value = 4;
//cv2.Value == 20
Пожалуйста, дайте мне знать, если это полезно и / или есть что-то, что я могу уточнить.
РЕДАКТИРОВАТЬ: Также нужны некоторые расходные материалы.
Вам также нужно будет внедрить AnonymousDisposable
& CompositeDisposable
для управления подписками, особенно в методе расширения Compute
. Взгляните на реализации Rx с использованием Reflector.NET или используйте мои версии ниже.
public sealed class AnonymousDisposable : IDisposable
{
private readonly Action _action;
private int _disposed;
public AnonymousDisposable(Action action)
{
_action = action;
}
public void Dispose()
{
if (Interlocked.Exchange(ref _disposed, 1) == 0)
{
_action();
}
}
}
public sealed class CompositeDisposable : IEnumerable<IDisposable>, IDisposable
{
private readonly List<IDisposable> _disposables;
private bool _disposed;
public CompositeDisposable()
: this(new IDisposable[] { })
{ }
public CompositeDisposable(IEnumerable<IDisposable> disposables)
{
if (disposables == null) { throw new ArgumentNullException("disposables"); }
this._disposables = new List<IDisposable>(disposables);
}
public CompositeDisposable(params IDisposable[] disposables)
{
if (disposables == null) { throw new ArgumentNullException("disposables"); }
this._disposables = new List<IDisposable>(disposables);
}
public void Add(IDisposable disposable)
{
if (disposable == null) { throw new ArgumentNullException("disposable"); }
lock (_disposables)
{
if (_disposed)
{
disposable.Dispose();
}
else
{
_disposables.Add(disposable);
}
}
}
public IDisposable Add(Action action)
{
if (action == null) { throw new ArgumentNullException("action"); }
var disposable = new AnonymousDisposable(action);
this.Add(disposable);
return disposable;
}
public IDisposable Add<TDelegate>(Action<TDelegate> add, Action<TDelegate> remove, TDelegate handler)
{
if (add == null) { throw new ArgumentNullException("add"); }
if (remove == null) { throw new ArgumentNullException("remove"); }
if (handler == null) { throw new ArgumentNullException("handler"); }
add(handler);
return this.Add(() => remove(handler));
}
public void Clear()
{
lock (_disposables)
{
var disposables = _disposables.ToArray();
_disposables.Clear();
Array.ForEach(disposables, d => d.Dispose());
}
}
public void Dispose()
{
lock (_disposables)
{
if (!_disposed)
{
this.Clear();
}
_disposed = true;
}
}
public IEnumerator<IDisposable> GetEnumerator()
{
lock (_disposables)
{
return _disposables.ToArray().AsEnumerable().GetEnumerator();
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
public bool IsDisposed
{
get
{
return _disposed;
}
}
}