Как реализован метод расширения Where () в IObservable? - PullRequest
2 голосов
/ 03 апреля 2012

Я пытался понять Rx более глубоко, следуя серии Барта де Сметса MinLinq и Джона Скитса «Reimplementing», я получил хорошее понимание, но ...

Взяв следующий код запример

var onePerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
var evenNums = onePerSecond.Where(x => x % 2 == 0);
evenNums.Subscribe(Console.WriteLine);

С точки зрения эквивланта IEnumerable Я понимаю поток данных MoveNext / Current и из блога мистера Скитса о том, как метод Where может быть реализован с помощью foreach overIEnumerable «this» параметр метода расширения.

Но в случае метода Where IObservable будет ли он содержать код для реализации интерфейса IObserver (или лямбда-эквивланта) и, следовательно, эффективно наблюдать все уведомления отобъект onePerSecond и, в свою очередь, возвращающий IObservable, который содержит только те значения, которые предикат считает истинными?

Любая помощь и мысли очень приветствуются, большое спасибо

Джеймс

Ответы [ 2 ]

3 голосов
/ 03 апреля 2012

Глядя на исходный код с помощью ILSpy, легко понять, как именно это реализовано в Where.Он возвращает новую наблюдаемую, которая фильтрует элементы на основе переданного вами предиката:

public static IObservable<TSource> Where<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
{
    if (source == null)
    {
        throw new ArgumentNullException("source");
    }
    if (predicate == null)
    {
        throw new ArgumentNullException("predicate");
    }
    return new AnonymousObservable<TSource>((IObserver<TSource> observer) => source.Subscribe(delegate(TSource x)
    {
        bool flag;
        try
        {
            flag = predicate(x);
        }
        catch (Exception error)
        {
            observer.OnError(error);
            return;
        }
        if (flag)
        {
            observer.OnNext(x);
        }
    }
    , new Action<Exception>(observer.OnError), new Action(observer.OnCompleted)));
}
1 голос
/ 03 апреля 2012

Вот несколько примеров игрушек, чтобы понять, как это работает:

https://github.com/ScottWeinstein/Rx-Demo/tree/master/ImplementWhereDemo

public class WhereObservableLessPedantic<T> : IObservable<T>
{
    private Func<T, bool> _pred;
    private IObservable<T> _stream;

    public WhereObservableLessPedantic(IObservable<T> stream, Func<T, bool> pred)
    {
        _pred = pred;
        _stream = stream;
    }

    public IDisposable Subscribe(IObserver<T> downStreamObserver)
    {
        Action<T> onNext = nextVal =>
        {
            if (_pred(nextVal))
                downStreamObserver.OnNext(nextVal);
        };
        return _stream.Subscribe(onNext);
    }
}


public class WhereObserverPedantic<T> : IObserver<T>
{
    private IObserver<T> _downStreamObserver;
    private Func<T, bool> _pred;

    public WhereObserverPedantic(IObserver<T> downStreamObserver, Func<T, bool> pred)
    {
        _pred = pred;
        _downStreamObserver = downStreamObserver;
    }

    public void OnNext(T value)
    {
        if (_pred(value))
        {
            _downStreamObserver.OnNext(value);
        }
    }

    public void OnCompleted() { }
    public void OnError(Exception error) { }
}
...