Создание слабой подписки на IObservable - PullRequest
20 голосов
/ 06 сентября 2011

Что я хочу сделать, так это убедиться, что если единственной ссылкой на моего наблюдателя является наблюдаемая, он получает мусор и прекращает прием сообщений.

Скажем, у меня есть элемент управления со списком, который называется «Сообщения», и этот код:

//Short lived display of messages (only while the user's viewing incoming messages)
public partial class MessageDisplay : UserControl
{
    public MessageDisplay()
    {
        InitializeComponent();
        MySource.IncomingMessages.Subscribe(m => Messages.Items.Add(m));
    }
}

Который подключается к этому источнику:

//Long lived location for message store
static class MySource
{
    public readonly static IObservable<string> IncomingMessages = new ReplaySubject<string>;
}

Чего я не хочу, так это чтобы дисплей сообщений оставался в памяти надолго после того, как он больше не виден. В идеале я хотел бы немного расширения, чтобы я мог написать:

MySource.IncomingMessages.ToWeakObservable().Subscribe(m => Messages.Items.Add(m));

Я также не хочу полагаться на тот факт, что MessageDisplay является пользовательским элементом управления, так как позже я захочу перейти к установке MVVM с MessageDisplayViewModel, которая не будет пользовательским элементом управления.

Ответы [ 5 ]

13 голосов
/ 06 сентября 2011

Вы можете подписать наблюдателя-посредника на наблюдаемую, которая содержит слабую ссылку на фактического наблюдателя и удаляет подписку, когда фактического наблюдателя больше нет в живых:

static IDisposable WeakSubscribe<T>(
    this IObservable<T> observable, IObserver<T> observer)
{
    return new WeakSubscription<T>(observable, observer);
}

class WeakSubscription<T> : IDisposable, IObserver<T>
{
    private readonly WeakReference reference;
    private readonly IDisposable subscription;
    private bool disposed;

    public WeakSubscription(IObservable<T> observable, IObserver<T> observer)
    {
        this.reference = new WeakReference(observer);
        this.subscription = observable.Subscribe(this);
    }

    void IObserver<T>.OnCompleted()
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnCompleted();
        else this.Dispose();
    }

    void IObserver<T>.OnError(Exception error)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnError(error);
        else this.Dispose();
    }

    void IObserver<T>.OnNext(T value)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnNext(value);
        else this.Dispose();
    }

    public void Dispose()
    {
        if (!this.disposed)
        {
            this.disposed = true;
            this.subscription.Dispose();
        }
    }
}
2 голосов
/ 08 апреля 2015

это моя реализация (выход из простой)

public class WeakObservable<T>: IObservable<T>
{
    private IObservable<T> _source;

    public WeakObservable(IObservable<T> source)
    {
        #region Validation

        if (source == null)
            throw new ArgumentNullException("source");

        #endregion Validation

        _source = source;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        IObservable<T> source = _source;
        if(source == null)
            return Disposable.Empty;
        var weakObserver = new WaekObserver<T>(observer);
        IDisposable disp = source.Subscribe(weakObserver);
        return disp;
    }
}
    public class WaekObserver<T>: IObserver<T>
{
    private WeakReference<IObserver<T>> _target;

    public WaekObserver(IObserver<T> target)
    {
        #region Validation

        if (target == null)
            throw new ArgumentNullException("target");

        #endregion Validation

        _target = new WeakReference<IObserver<T>>(target);
    }

    private IObserver<T> Target
    {
        get
        {
            IObserver<T> target;
            if(_target.TryGetTarget(out target))
                return target;
            return null;
        }
    }

    #region IObserver<T> Members

    /// <summary>
    /// Notifies the observer that the provider has finished sending push-based notifications.
    /// </summary>
    public void OnCompleted()
    {
        IObserver<T> target = Target;
        if (target == null)
            return;

        target.OnCompleted();
    }

    /// <summary>
    /// Notifies the observer that the provider has experienced an error condition.
    /// </summary>
    /// <param name="error">An object that provides additional information about the error.</param>
    public void OnError(Exception error)
    {
        IObserver<T> target = Target;
        if (target == null)
            return;

        target.OnError(error);
    }

    /// <summary>
    /// Provides the observer with new data.
    /// </summary>
    /// <param name="value">The current notification information.</param>
    public void OnNext(T value)
    {
        IObserver<T> target = Target;
        if (target == null)
            return;

        target.OnNext(value);
    }

    #endregion IObserver<T> Members
}
    public static class RxExtensions
{
    public static IObservable<T> ToWeakObservable<T>(this IObservable<T> source)
    {
        return new WeakObservable<T>(source);
    }
}
        static void Main(string[] args)
    {
        Console.WriteLine("Start");
        var xs = Observable.Interval(TimeSpan.FromSeconds(1));
        Sbscribe(xs);

        Thread.Sleep(2020);
        Console.WriteLine("Collect");
        GC.Collect();
        GC.WaitForPendingFinalizers();
        GC.Collect();
        Console.WriteLine("Done");
        Console.ReadKey();
    }

    private static void Sbscribe<T>(IObservable<T> source)
    {
        source.ToWeakObservable().Subscribe(v => Console.WriteLine(v));
    }
2 голосов
/ 24 мая 2014

Пару лет спустя наткнулся на эту тему ... просто хотел указать на решение, указанное в блоге Сэмюэла Джека , которое добавляет метод расширения IObservable под названием WeaklySubscribe.Он использует подход добавления прокладки между субъектом и наблюдателем, который отслеживает цель с помощью WeakReference.Это похоже на решения, предложенные другими для решения проблемы сильных ссылок в подписках на события, например, в этой статье или в этом решении Пола Стовелла .Какое-то время я использовал что-то, основанное на подходе Пола, и мне нравится решение Самуэля о слабых IObservable подписках.

0 голосов
/ 02 ноября 2017

Ключ в том, чтобы распознать, что вам нужно будет передать как целевое, так и двухпараметрическое действие.Действие с одним параметром никогда не сделает этого, потому что вы либо используете слабую ссылку на свое действие (а действие получает GC'd), либо вы используете сильную ссылку на свое действие, которое, в свою очередь, имеет сильную ссылку на цельтак что цель не может получить GC'd.Имея это в виду, следующие работы:

using System;

namespace Closures {
  public static class WeakReferenceExtensions {
    /// <summary> returns null if target is not available. Safe to call, even if the reference is null. </summary>
    public static TTarget TryGetTarget<TTarget>(this WeakReference<TTarget> reference) where TTarget : class {
      TTarget r = null;
      if (reference != null) {
        reference.TryGetTarget(out r);
      }
      return r;
    }
  }
  public static class ObservableExtensions {

    public static IDisposable WeakSubscribe<T, U>(this IObservable<U> source, T target, Action<T, U> action)
      where T : class {
      var weakRef = new WeakReference<T>(target);
      var r = source.Subscribe(u => {
        var t = weakRef.TryGetTarget();
        if (t != null) {
          action(t, u);
        }
      });
      return r;
    }
  }
}

Наблюдаемый образец:

using System;
using System.Reactive.Subjects;

namespace Closures {
  public class Observable {
    public IObservable<int> ObservableProperty => _subject;
    private Subject<int> _subject = new Subject<int>();
    private int n;
    public void Fire() {
      _subject.OnNext(n++);
    }
  }
}

Использование:

Class SomeClass {

 IDisposable disposable;

 public void SomeMethod(Observable observeMe) {
   disposable = observeMe.ObservableProperty.WeakSubscribe(this, (wo, n) => wo.Log(n));
 }

  public void Log(int n) {
    System.Diagnostics.Debug.WriteLine("log "+n);
  }
}
0 голосов
/ 06 сентября 2011

Код ниже вдохновлен оригинальным постом dtb. Единственное изменение заключается в том, что он возвращает ссылку на наблюдателя как часть IDisposable. Это означает, что ссылка на IObserver будет сохраняться до тех пор, пока вы сохраняете ссылку на IDisposable, который вы получаете в конце цепочки (при условии, что все одноразовые материалы сохраняют ссылку на одноразовые до них). Это позволяет использовать методы расширения, такие как Subscribe(M=>DoSomethingWithM(M)), потому что мы сохраняем ссылку на неявно созданный IObserver, но мы не храним строгую ссылку от источника на IObserver (что приведет к памяти).

using System.Reactive.Linq;

static class WeakObservation
{
    public static IObservable<T> ToWeakObservable<T>(this IObservable<T> observable)
    {
        return Observable.Create<T>(observer =>
            (IDisposable)new DisposableReference(new WeakObserver<T>(observable, observer), observer)
            );
    }
}

class DisposableReference : IDisposable
{
    public DisposableReference(IDisposable InnerDisposable, object Reference)
    {
        this.InnerDisposable = InnerDisposable;
        this.Reference = Reference;
    }

    private IDisposable InnerDisposable;
    private object Reference;

    public void Dispose()
    {
        InnerDisposable.Dispose();
        Reference = null;
    }
}

class WeakObserver<T> : IObserver<T>, IDisposable
{
    private readonly WeakReference reference;
    private readonly IDisposable subscription;
    private bool disposed;

    public WeakObserver(IObservable<T> observable, IObserver<T> observer)
    {
        this.reference = new WeakReference(observer);
        this.subscription = observable.Subscribe(this);
    }

    public void OnCompleted()
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnCompleted();
        else this.Dispose();
    }

    public void OnError(Exception error)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnError(error);
        else this.Dispose();
    }

    public void OnNext(T value)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnNext(value);
        else this.Dispose();
    }

    public void Dispose()
    {
        if (!this.disposed)
        {
            this.disposed = true;
            this.subscription.Dispose();
        }
    }
}
...