Обычно вы не реализуете IObservable<T>
самостоятельно, вы возвращаете IObservable<T>
из метода, используя один из методов генерации (например, Observable.Create
).
Однако, если вы собираетесь реализовать интерфейс самостоятельно, вам следует обернуть внутренний Subject<T>
, который будет обрабатывать все проблемы параллелизма для вас:
public class CustomObservable<T> : IObservable<T>
{
private Subject<T> subject = new Subject<T>();
public IDisposable Subscribe(IObserver<T> observer)
{
return subject.Subscribe(observer);
}
private void EmitValue(T value)
{
subject.OnNext(value);
}
}
NB: Если вы решили остаться с делегатом (по какой-либо причине), по крайней мере, убедитесь, что вы отменили подписку в возвращаемом значении IDisposable
:
observers += observer.OnNext;
return Disposable.Create(() => observers -= observer.OnNext);