Как я могу получить IObservable, чтобы выдвинуть новейшее значение при подписке - PullRequest
5 голосов
/ 13 ноября 2010

Обычно, когда вы подписываетесь на изменения значения, вам также интересно знать начальное значение. Я хочу, чтобы мой IObservable кэшировал последнее (или начальное) значение и выдавал это значение при подписке.

При использовании простых событий я часто получаю код, похожий на

x.SomeEvent += SomeEventHandler;
SomeEventHandler(x, EventArgs.Empty);

Используя IObservable, я надеялся обернуть событие чем-то, что выдвигает начальное значение. Если у меня несколько подписчиков, они должны получить новейшее значение при подписке У меня есть некоторый код, который работает правильно, если я подписываюсь сразу после создания IObservable, но не, если событие запускается перед подпиской:

class Program
{
    static void Main()
    {
        var s = new Source { Value = 1 };
        var values = Observable.Return(s.Value).Concat(
            Observable.FromEvent(
                h => s.ValueChanged += h,
                h => s.ValueChanged -= h)
            .Select(_ => s.Value));
        using (values.Subscribe(Console.WriteLine))
        {
            s.Value = 2; // prints 1,2 as expected
        }
        using (values.Subscribe(Console.WriteLine))
        {
            s.Value = 3; // prints 1,3 - expected 2,3
        }
    }
}

class Source
{
    private int _value;
    public int Value
    {
        get { return _value; }
        set
        {
            if (_value == value)
                return;
            _value = value;
            if (ValueChanged != null)
                ValueChanged(this, EventArgs.Empty);
        }
    }

    public event EventHandler ValueChanged;
}

Как мне создать IObservable, который работает как положено?

1 Ответ

6 голосов
/ 13 ноября 2010

Решение состоит в том, чтобы подписать BehaviorSubject на наблюдаемое и подписать всех наблюдателей на BehaviorSubject. BehaviorSubject запомнит последнее уведомление и уведомит новых наблюдателей о нем при подписке.

Посмотрите на метод расширения Observable.Publish, который имеет параметр initialValue. Это создает IConnectableObservable, который внутренне использует BehaviorSubject.

var s = new Source { Value = 1 };

var values = Observable.FromEvent(h => s.ValueChanged += h,
                                  h => s.ValueChanged -= h)
                       .Select(e => e.NewValue)
                       .Publish(s.Value);

using (values.Connect())                         // subscribes subject to event
{
    using (values.Subscribe(Console.WriteLine))  // subscribes to subject
    {
        s.Value = 2;
    }                                            // unsubscribes from subject

    using (values.Subscribe(Console.WriteLine))  // subscribes to subject
    {
        s.Value = 3;
    }                                            // unsubscribes from subject

}                                            // unsubscribes subject from event

(непроверенные)

...