CombineLatest, но только толчок влево - PullRequest
7 голосов
/ 08 июля 2011

Мне нужно реализовать версию CombineLatest (я назову ее WithLatest здесь), которая вызывает селектор для каждого элемента слева и последний элемент справа. Не следует нажимать только на предметы справа, меняющиеся.

Я думаю, построено ли это Observable.Create или комбинация существующих расширений не особенно важна; Я сделаю этот метод расширения в штучной упаковке в любом случае.

Пример

var left = new Subject<int>();
var right = new Subject<int>();

left.WithLatest(right, (l,r) => l + " " + r).Dump();

left.OnNext(1);   // <1>
left.OnNext(2);   // <2>
right.OnNext(1);  // <3>
right.OnNext(2);  // <4>
left.OnNext(3);   // <5>

должен дать

2 1
3 2

Редактировать : логика моего примера звучит так:

  1. Слева заполняется 1. Справа пусто, значения не нажимаются.
  2. Слева обновляется с 2 (он забывает предыдущее значение). Право еще пусто, поэтому ничего не нажимается.
  3. Справа заполняется 1, поэтому слева = 2 (последнее значение), справа = 1 нажата. До этого момента нет никакой разницы между WithLatest и CombineLatest
  4. Право обновлено - ничего не нажимается. Это то, что отличается
  5. Слева обновляется с 3, поэтому слева = 3, справа = 2 (последнее значение) нажата.

Было предложено попробовать:

var lr = right.ObserveOn(Scheduler.TaskPool).Latest();
left.Select(l => l + " " + lr.First()).Dump();

но это блокирует текущий поток для моего теста.

Ответы [ 6 ]

5 голосов
/ 28 июля 2015

Вы можете сделать это с помощью существующих операторов.

Func<int, int, string> selector = (l, r) => l + " " + r;

var query = right.Publish(rs => left.Zip(rs.MostRecent(0), selector).SkipUntil(rs));
  • Publish гарантирует, что мы когда-либо подписываемся только на right один раз и разделяем подписку среди всех подписчиков на rs.

  • MostRecent превращает IObservable<T> в IEnumerable<T>, который всегда возвращает последнее испущенное значение из наблюдаемого источника.

  • Zip между IObservable<T> и IEnumerable<U> выдает значение каждый раз, когда наблюдаемое излучает значение.

  • SkipUntil пропускает пары (l, r), которые встречаются до rightкогда-либо выдает значение.

4 голосов
/ 12 августа 2013

У меня тоже была такая же потребность в CombineLatest, который "толкает только влево".

Я сделал решение "перегрузкой" Observable.Sample, потому что это то, что делает метод:
Он производит выборку source (справа) с помощью sampler (слева) с дополнительной возможностью обеспечения resultSelector (как в CombineLatest).

public static IObservable<TResult> Sample<TSource, TSample, TResult>(
    this IObservable<TSource> source,
    IObservable<TSample> sampler,
    Func<TSource, TSample, TResult> resultSelector)
{
    var multiSampler = sampler.Publish().RefCount();
    return source.CombineLatest(multiSampler, resultSelector).Sample(multiSampler);
}
1 голос
/ 11 августа 2014

Основываясь на решении, выбранном автором сообщения, я думаю, что есть еще более простое решение, использующее DistinctUntilChanged :

public static IObservable<TResult> CombineLatestOnLeft<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) {
        return leftSource
            .Select<TLeft, Tuple<TLeft, int>>(Tuple.Create<TLeft, int>)
            .CombineLatest(rightSource,
                (l, r) => new { Index = l.Item2, Left = l.Item1, Right = r })
            .DistinctUntilChanged(x => x.Index)
            .Select(x => selector(x.Left, x.Right));
    }

или даже

public static IObservable<TResult> CombineLatestOnLeft<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) {
        return leftSource
            .CombineLatest(rightSource,
                (l, r) => new { Left = l, Right = r })
            .DistinctUntilChanged(x => x.Left)
            .Select(x => selector(x.Left, x.Right));
    }

, если вызаботятся только о различных значениях leftSource

0 голосов
/ 07 января 2018

На последних System.Reactive , мы можем использовать WithLatestFrom метод расширения.

left.WithLatestFrom(right, (l, r) => l + " " + r).Dump();

Результат будет ниже правильно.

3 2 
0 голосов
/ 26 мая 2016

Сегодня я сделал RX-оператор для проекта, который делает это.

Вот мои решения:

    public static IObservable<Tuple<TSource, TTarget>> JoinLeftSoft<TSource, TTarget>(
        this IObservable<TSource> source, IObservable<TTarget> right)
    {
        return source
            .Select(x => new Tuple<object, TSource>(new object(), x))
            .CombineLatest(right, (l, r) => new Tuple<object, TSource, TTarget>(l.Item1, l.Item2, r))
            .DistinctUntilChanged(t => t.Item1)
            .Select(t => new Tuple<TSource, TTarget>(t.Item2, t.Item3));
    }
0 голосов
/ 08 июля 2011

Вот хакерский способ использования Create - на самом деле его не строили, извините, если он на самом деле не работает:)

public static IObservable<TRet> WithLatest<TLeft, TRight, TRet>(
        this IObservable<TLeft> lhs, 
        IObservable<TRight> rhs, 
        Func<TLeft, TRight, TRet> sel)
{
    return Observable.Create<TRet>(subj => {
        bool rhsSet = false;
        bool deaded = false;
        var latestRhs = default(TRight);

        Action onDeaded = null;

        var rhsDisp = rhs.Subscribe(
            x => { latestRhs = x; rhsSet = true; }, 
            ex => { subj.OnError(ex); onDeaded(); });

        var lhsDisp = lhs
            .Where(_ => deaded == false && rhsSet == true)
            .Subscribe(
                x => subj.OnNext(sel(x, latestRhs)),
                ex => { subj.OnError(ex); onDeaded(); },
                () => { subj.OnCompleted(); onDeaded(); });

        onDeaded = () => {
            deaded = true;
            if (lhsDisp != null) {
                lhsDisp.Dispose();
                lhsDisp = null;
            }
            if (rhsDisp != null) {
                rhsDisp.Dispose();
                rhsDisp = null;
            }
        };

        return onDeaded;
    });
}
...