Получить предыдущий элемент в IObservable без переоценки последовательности - PullRequest
13 голосов
/ 12 мая 2010

В последовательности IObservable (в Reactive Extensions для .NET) я хотел бы получить значения предыдущих и текущих элементов, чтобы я мог их сравнить. Я нашел в Интернете пример, аналогичный приведенному ниже, который выполняет задачу:

sequence.Zip(sequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur })

Он работает нормально, за исключением того, что он оценивает последовательность дважды, чего я хотел бы избежать. Вы можете видеть, что он оценивается дважды с помощью этого кода:

var debugSequence = sequence.Do(item => Debug.WriteLine("Retrieved an element from sequence"));
debugSequence.Zip(debugSequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur }).Subscribe();

Выходные данные показывают в два раза больше строк отладки, чем элементов в последовательности.

Я понимаю, почему это происходит, но до сих пор я не нашел альтернативы, которая бы не оценивала последовательность дважды. Как я могу объединить предыдущий и текущий только с одной оценкой последовательности?

Ответы [ 5 ]

25 голосов
/ 16 мая 2013

Я думаю, что есть лучшее решение, которое использует Observable.Scan и позволяет избежать двойной подписки:

public static IObservable<Tuple<TSource, TSource>>
    PairWithPrevious<TSource>(this IObservable<TSource> source)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (acc, current) => Tuple.Create(acc.Item2, current));
}

Я написал это в своем блоге здесь: http://www.zerobugbuild.com/?p=213

Добавление

Дальнейшая модификация позволяет более аккуратно работать с произвольными типами с помощью селектора результатов:

public static IObservable<TResult> CombineWithPrevious<TSource,TResult>(
    this IObservable<TSource> source,
    Func<TSource, TSource, TResult> resultSelector)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (previous, current) => Tuple.Create(previous.Item2, current))
        .Select(t => resultSelector(t.Item1, t.Item2));
}
3 голосов
/ 13 мая 2010

Двойная оценка является показателем наблюдаемой холодности . Вы можете включить его в Горячий, используя .Publish ():

var pub = sequence.Publish();
pub.Zip(pub.Skip(1), (...
pub.Connect();
2 голосов
/ 21 августа 2015

@ Приложение James World выглядит великолепно, если бы не Tuple<>, который мне почти всегда не нравится: " Был ли .Item1 предыдущим? Или это был текущий? Я не помню. И что первый аргумент селектора, это был предыдущий элемент?".

Для этой части мне понравилось определение @dcstraw выделенного ItemWithPrevious<T>. Итак, вы можете соединить их вместе (надеюсь, я не перепутал предыдущее с текущим) с некоторыми переименованиями и возможностями:

public static class ObservableExtensions
{
    public static IObservable<SortedPair<TSource>> CombineWithPrevious<TSource>(
        this IObservable<TSource> source, 
        TSource initialValue = default(TSource))
    {
        var seed = SortedPair.Create(initialValue, initialValue);

        return source.Scan(seed,
            (acc, current) => SortedPair.Create(current, acc.Current));
    }

    public static IObservable<TResult> CombineWithPrevious<TSource, TResult>(
        this IObservable<TSource> source,
        Func<SortedPair<TSource>, TResult> resultSelector,
        TSource initialValue = default(TSource))
    {
        var seed = SortedPair.Create(initialValue, initialValue);

        return source
            .Scan(seed,
                (acc, current) => SortedPair.Create(current, acc.Current))
            .Select(p => resultSelector(p));
    }
}

public class SortedPair<T>
{
    public SortedPair(T current, T previous)
    {
        Current = current;
        Previous = previous;
    }

    public SortedPair(T current) : this(current, default(T)) { }

    public SortedPair() : this(default(T), default(T)) { }

    public T Current;
    public T Previous;
}

public class SortedPair
{
    public static SortedPair<T> Create<T>(T current, T previous)
    {
        return new SortedPair<T>(current, previous);
    }

    public static SortedPair<T> Create<T>(T current)
    {
        return new SortedPair<T>(current);
    }

    public static SortedPair<T> Create<T>()
    {
        return new SortedPair<T>();
    }
}
0 голосов
/ 12 мая 2010

Оказывается, вы можете использовать переменную для хранения предыдущего значения, обращения к нему и переназначения его в цепочке расширений IObservable. Это даже работает в рамках вспомогательного метода. Используя приведенный ниже код, я могу теперь вызывать CombineWithPrevious() на моем IObservable, чтобы получить ссылку на предыдущее значение, без переоценки последовательности.

public class ItemWithPrevious<T>
{
    public T Previous;
    public T Current;
}

public static class MyExtensions
{
    public static IObservable<ItemWithPrevious<T>> CombineWithPrevious<T>(this IObservable<T> source)
    {
        var previous = default(T);

        return source
            .Select(t => new ItemWithPrevious<T> { Previous = previous, Current = t })
            .Do(items => previous = items.Current);
    }
}
0 голосов
/ 12 мая 2010

Если вам нужен только доступ к предыдущему элементу во время подписки, это, вероятно, самая простая вещь, которая будет работать. (Я уверен, что есть лучший способ, может быть, оператор буфера в IObservable? Документация на данный момент довольно скудная, поэтому я не могу вам сказать.)

    EventArgs prev = null;

    sequence.Subscribe(curr => 
    {
        if (prev != null)
        {
            // Previous and current element available here
        }

        prev = curr;                              

    });

EventArgs - это просто тип аргумента вашего события.

...