Проблема с реактивными наблюдателями расширений - PullRequest
1 голос
/ 14 сентября 2011

Я работал над приложением с использованием Reactive Extensions и столкнулся со следующей проблемой:

скажем, у меня есть два наблюдателя P и Q, я хочу построить третьего наблюдателя R, который, если два значения P приходят без Q, R выводит 0. И если после P приходит Q, R выводит результат метод, передающий эти значения, что-то вроде:

P0    Q0    ->    R0 = f(P0,Q0)    
P1          ->    R1 = 0    
P2    Q1    ->    R2 = f(P2,Q1)    
P3          ->    R3 = 0    
P4          ->    R4 = 0    
P5    Q2    ->    R5 = f(P5,Q2)
(...)

и значения поступают в наблюдатели в следующем порядке:

P0 Q0 P1 P2 Q1 P3 P4 P5 Q2

спасибо за вашу помощь.

Ответы [ 4 ]

1 голос
/ 14 сентября 2011

Общая идея проста: вы объединяете P и Q, используете BufferWithCount (2), чтобы получить пары значений, а затем обрабатываете пары в соответствии с вашей логикой:


P.Merge(Q).BufferWithCount(2).Select(values =>
{
    var first = values[0];
    var second = values[1];
    if (first is P && second is P ||
        first is Q && second is Q)
    {
        return 0;
    }

    if (first is P)
    {
        return selector(first, second);
    }
    else // suppose Q, P is a valid sequence as well.
    {
        return selector(second, first);
    }
});

Теперь сложная часть состоит в объединенииP и Q, если они разного типа, а затем различают их в Select.Если они относятся к одному и тому же типу, вы можете использовать что-то простое, например, подход, предложенный Enigmativity, то есть


var pqs =
    (from p in ps select new { k = "p", v = p })
        .Merge(from q in qs select new { k = "q", v = q });

Теперь сложная часть, если они разных типов, для объединения их нам понадобится некоторая общая оболочкатипа, что-то вроде, например, Data.Either от Haskell:


public abstract class Either<TLeft, TRight>
{
    private Either()
    {
    }

    public static Either<TLeft, TRight> Create(TLeft value)
    {
        return new Left(value);
    }

    public static Either<TLeft, TRight> Create(TRight value)
    {
        return new Right(value);
    }

    public abstract TResult Match<TResult>(
        Func<TLeft, TResult> onLeft,
        Func<TRight, TResult> onRight);

    public sealed class Left : Either<TLeft, TRight>
    {
        public Left(TLeft value)
        {
            this.Value = value;
        }

        public TLeft Value
        {
            get;
            private set;
        }

        public override TResult Match<TResult>(
            Func<TLeft, TResult> onLeft,
            Func<TRight, TResult> onRight)
        {
            return onLeft(this.Value);
        }
    }

    public sealed class Right : Either<TLeft, TRight>
    {
        public Right(TRight value)
        {
            this.Value = value;
        }

        public TRight Value
        {
            get;
            private set;
        }

        public override TResult Match<TResult>(
            Func<TLeft, TResult> onLeft,
            Func<TRight, TResult> onRight)
        {
            return onRight(this.Value);
        }
    }
}

Достаточно забавно, в System.Reactive.dll уже есть похожий класс Either, к сожалению, он внутренний, поэтому нам нужнонаша собственная реализация.Теперь мы можем поместить как P, так и Q в Either и продолжить без решения (я немного обобщил его, поэтому вы можете вернуть любой результат вместо int):


public static IObservable<TResult> SmartZip<TLeft, TRight, TResult>(
    IObservable<TLeft> leftSource,
    IObservable<TRight> rightSource,
    Func<TLeft, TRight, TResult> selector)
{
    return Observable
        .Merge(
            leftSource.Select(Either<TLeft, TRight>.Create),
            rightSource.Select(Either<TLeft, TRight>.Create))
        .BufferWithCount(2)
        .Select(values =>
            {
                // this case was not covered in your question,
                // but I've added it for the sake of completeness.
                if (values.Count < 2)
                {
                    return default(TResult);
                }

                var first = values[0];
                var second = values[1];

                // pattern-matching in C# is really ugly.
                return first.Match(
                    left => second.Match(
                        _ => default(TResult),
                        right => selector(left, right)),
                    right => second.Match(
                        left => selector(left, right),
                        _ => default(TResult)));
            });
}

А вот небольшойдемо для всего этого страшного уродливого материала.


private static void Main(string[] args)
{
    var psource = Observable
        .Generate(1, i => i < 100, i => i, i => i + 1)
        .Zip(Observable.Interval(TimeSpan.FromMilliseconds(10.0)), (i, _) => i);
    var qsource = Observable
        .Generate(1, i => i < 100, i => (double)i * i, i => i + 1)
        .Zip(Observable.Interval(TimeSpan.FromMilliseconds(30.0)), (i, _) => i);

    var result = SmartZip(
        psource,
        qsource,
        (p, q) => q / p).ToEnumerable();
    foreach (var item in result)
    {
        Console.WriteLine(item);
    }
}

1 голос
/ 14 сентября 2011

Я думаю, у меня есть решение для вас.

Если я предполагаю, что вы определили следующее:

IObservable<int> ps = ...;
IObservable<int> qs = ...;

Func<int, int, int> f = ...;

Сначала я создаю словарь функций для вычисления окончательных значений:

var fs = new Dictionary<string, Func<int, int, int?>>()
{
    { "pp", (x, y) => 0 },
    { "pq", (x, y) => f(x, y) },
    { "qp", (x, y) => null },
    { "qq", (x, y) => null },
};

Здесь есть каждая комбинация "p" и "q".

Затем вы можете создать объединенную наблюдаемую форму, подобную этой:

var pqs =
    (from p in ps select new { k = "p", v = p })
        .Merge(from q in qs select new { k = "q", v = q });

Теперь я знаю, какая последовательность произвела какое значение.

Затем я публикую объединенный список, так как не знаю, являются ли наблюдаемые источника горячими или холодными - поэтому их публикация делает их горячими - и затем я застегиваю публикуемую наблюдаемую себе, пропуская один и ноль соответственно. Затем я знаю каждую пару значений и исходные наблюдаемые, из которых они произошли. Тогда легко применить функции словаря (отфильтровывая любые нулевые значения).

Вот оно:

var rs =
    from kvv in pqs.Publish(_pqs =>
        _pqs.Skip(1).Zip(_pqs, (pq1, pq0) => new
        {
            k = pq0.k + pq1.k,
            v1 = pq1.v,
            v0 = pq0.v
        }))
    let r = fs[kvv.k](kvv.v0, kvv.v1)
    where r.HasValue
    select r.Value;

Это работает для вас?

0 голосов
/ 17 сентября 2011

Предположим, у нас есть два метода

  1. До , Объединяет две наблюдаемые последовательности в одну наблюдаемую последовательность с помощью функции селектора всякий раз, когда первая наблюдаемая создает элемент сразу перед вторым..
  2. Без , Объединяет наблюдаемую последовательность в другую наблюдаемую последовательность каждый раз, когда два элемента поступают вместе из первой наблюдаемой без какого-либо элемента из второй.

С помощью этих методов проблема почти решена.

IObservable<TP> P = // observer P
IObservable<TQ> Q = // observer Q

var PP = P.Without((prev, next) => 0, Q);
var PQ = P.Before(Q, (p,q) => f(p,q)); // apply the function

var ResultSecuence = PP.Merge(PQ);

И вот два метода

public static class Observer
{
    /// <summary>
    /// Merges two observable sequences into one observable sequence by using the selector function 
    /// whenever the first observable produces an element rigth before the second one.
    /// </summary>
    /// <param name="first"> First observable source.</param>
    /// <param name="second">Second observable source.</param>
    /// <param name="resultSelector">Function to invoke whenever the first observable produces an element rigth before the second one.</param>
    /// <returns>
    /// An observable sequence containing the result of combining elements of both sources 
    /// using the specified result selector function.
    /// </returns>
    public static IObservable<TResult> Before<TLeft, TRight, TResult>(this IObservable<TLeft> first, IObservable<TRight> second, Func<TLeft, TRight, TResult> resultSelector)
    {
        var result = new Subject<TResult>();

        bool firstCame = false;
        TLeft lastLeft = default(TLeft);

        first.Subscribe(item =>
        {
            firstCame = true;
            lastLeft = item;
        });

        second.Subscribe(item =>
        {
            if (firstCame)
                result.OnNext(resultSelector(lastLeft, item));

            firstCame = false;
        });

        return result;
    }

    /// <summary>
    /// Merges an observable sequence into one observable sequence by using the selector function 
    /// every time two items came from <paramref name="first"/> without any item of any observable
    /// in <paramref name="second"/>
    /// </summary>
    /// <param name="first"> Observable source to merge.</param>
    /// <param name="second"> Observable list to ignore.</param>
    /// <param name="resultSelector">Function to invoke whenever the first observable produces two elements without any of the observables in the secuence produces any element</param>
    /// <returns>
    /// An observable sequence containing the result of combining elements
    /// using the specified result selector function.
    /// </returns>
    public static IObservable<TResult> Without<TLeft, TResult>(this IObservable<TLeft> first,  Func<TLeft, TLeft, TResult> resultSelector,params IObservable<object>[] second)
    {
        var result = new Subject<TResult>();

        bool firstCame = false;
        TLeft lastLeft = default(TLeft);

        first.Subscribe(item =>
        {
            if (firstCame)
                result.OnNext(resultSelector(lastLeft, item));

            firstCame = true;
            lastLeft = item;
        });

        foreach (var observable in second)
            observable.Subscribe(item => firstCame = false);

        return result;
    }        
}
0 голосов
/ 15 сентября 2011

Если я правильно понял ваш вопрос, то ниже приведена общая функция, которая может обрабатывать такие случаи:

public static IObservable<T> MyCombiner<T>(IObservable<T> P, IObservable<T> Q, T defaultValue,Func<T,T,T> fun)
        {
            var c = P.Select(p => new { Type = 'P', Value = p })
                        .Merge(Q.Select(p => new { Type = 'Q', Value = p }));
            return c.Zip(c.Skip(1), (a, b) =>
            {
                if (a.Type == 'P' && b.Type == 'P')
                    return new { Ok = true, Value = defaultValue };
                if (a.Type == 'P' && b.Type == 'Q')
                    return new { Ok = true, Value = fun(a.Value, b.Value) };
                else
                    return new { Ok = false, Value = default(T) };
            }).Where(b => b.Ok).Select(b => b.Value);

        }
...