объединить несколько наблюдаемых в наблюдаемый массив - PullRequest
2 голосов
/ 18 августа 2011

Привет. Я пытаюсь объединить несколько наблюдаемых в наблюдаемый массив. Вот пример, который работает в ФСИ. (извините, что это долго)

#r "./bin/Debug/System.Reactive.dll"

open System
open System.Reactive.Linq

/// Subscribes to the Observable with all 3 callbacks.
let subscribeComplete next error completed (observable: IObservable<'T>) = 
    observable.Subscribe(
        (fun x -> next x),
        (fun e -> error e),
        (fun () -> completed()))

/// Subscribes to the Observable with a next and an error-function.
let subscribeWithError next error observable = 
    subscribeComplete next error (fun () -> ()) observable

/// Subscribes to the Observable with a next-function
let subscribe (next: 'T -> unit) (observable: IObservable<'T>) : IDisposable = 
    subscribeWithError next ignore observable

/// Static method to generate observable from input functions
let ObsGenerate (initState: 'TS) (termCond: 'TS -> bool) (iterStep: 'TS -> 'TS)
        (resSelect: 'TS -> 'TR) (timeSelect : 'TS -> System.TimeSpan) =
            Observable.Generate(initState, termCond, iterStep, resSelect, timeSelect)

//maps the given observable with the given function
let obsMap (f: 'T -> 'U) (observable : IObservable<'T>) : IObservable<'U> =
    Observable.Select(observable, Func<_,_>(f))

/// Merges two observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest (obs1: IObservable<'T>) (obs2: IObservable<'U>) : IObservable<'T * 'U> = 
    Observable.CombineLatest(
        obs1, obs2, Func<_,_,_>(fun a b -> a, b))    

/// Merges three observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest3 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) : IObservable<'T * 'U * 'V> = 
    let obs12 =obs1.CombineLatest(obs2, Func<_,_,_>(fun a b -> a, b))    
    obs12.CombineLatest(obs3, Func<_,_,_>(fun (a,b) c -> a, b, c))    

/// Merges four observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest4 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) (obs4: IObservable<'W>) : IObservable<'T * 'U * 'V * 'W> = 
    let obsNew = combineLatest3 obs1 obs2 obs3
    obsNew.CombineLatest(obs4, Func<_,_,_>(fun (a,b,c) d -> a, b, c, d))    

// second section generating arrays
let combineLatestArray (obs1: IObservable<'T>) (obs2: IObservable<'T>) =
    combineLatest obs1 obs2 
    |> obsMap (fun (a, b) -> [a; b] |> List.toArray)

let combineLatest3Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) =
    combineLatest3 obs1 obs2 obs3 
    |> obsMap (fun (a, b, c) -> [a; b; c] |> List.toArray)

let combineLatest4Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) (obs4: IObservable<'T>) =
    combineLatest4 obs1 obs2 obs3 obs4
    |> obsMap (fun (a, b, c, d) -> [a; b; c; d] |> List.toArray)

let combineLatestListToArray (list: IObservable<'T> List) =
    match list.Length with
    | 2 -> combineLatestArray list.[0] list.[1]
    | 3 -> combineLatest3Array list.[0] list.[1] list.[2]
    | 4 -> combineLatest4Array list.[0] list.[1] list.[2] list.[3]
    | _ -> failwith "combine latest on unsupported list size"

type FooType = 
        {   NameVal :   string
            IdVal   :   int
            RetVal  :   float }

        member x.StringKey() =
            x.NameVal.ToString() + ";" + x.IdVal.ToString()


// example code starts here

let rnd = System.Random()

let fooListeners = Collections.Generic.Dictionary()

let AddAFoo (foo : FooType) =
    let fooId = foo.StringKey()

    if fooListeners.ContainsKey(fooId)
        then fooListeners.[fooId]
    else
        let myObs = ObsGenerate {NameVal = foo.NameVal; IdVal = foo.IdVal; RetVal = foo.RetVal} (fun x -> true) (fun x -> {NameVal = (x.NameVal); IdVal = (x.IdVal); RetVal = (x.RetVal + rnd.NextDouble() - 0.5)}) (fun x -> x) (fun x -> System.TimeSpan.FromMilliseconds(rnd.NextDouble() * 2000.0))
        fooListeners.Add(fooId,myObs)
        myObs

let fooInit =   [6..9]
                |> List.map (fun index -> {NameVal = (string index + "st"); IdVal = index; RetVal = (float index + 1.0)})     
                |> List.map (fun foo -> AddAFoo foo)

let fooValuesArray =    fooInit
                        |> List.map(fun x -> (x |> obsMap (fun x -> x.RetVal)))
                        |> combineLatestListToArray

let mySub =
    fooValuesArray
    |> subscribe (fun fooVals -> printfn "fooArray: %A" fooVals)

//execute until here to start example

// execute this last line to unsubscribe
mySub.Dispose()

У меня есть два вопроса об этом коде:

  1. Есть ли более разумный способ объединения наблюдаемых в массивы? (это становится очень длинным, поскольку мне нужно объединить большие массивы)

  2. Я хочу ограничить обновления. Под этим я подразумеваю, что я хочу, чтобы все обновления, которые происходят внутри (скажем) одной и той же половины секунды, обрабатывались как одно обновление массива. В идеале, я хочу, чтобы это окно открывалось только при поступлении первого обновления, то есть, если в течение 2 секунд не поступает никаких обновлений, то приходит одно обновление, затем мы ждем и включаем дополнительные обновления в течение 0,5 секунд, а затем запускаем наблюдаемое. Я не хочу, чтобы он публиковался периодически каждые 0,5 секунды, хотя никаких видимых событий не происходит. Надеюсь, это описание достаточно понятно.

обновление: я решил принять один из ответов F #, но я еще не оправдал ответы C #. Я надеюсь, что скоро смогу их проверить.

Ответы [ 5 ]

2 голосов
/ 19 августа 2011

Для 1 приложение List.fold и List.toArray и несколько Observable операторов должны работать хорошо. Что-то вроде:

let combineLatest observables =
    Observable.Select(
        (observables 
         |> List.fold (fun ol o 
                         -> Observable.CombineLatest(o, ol, (fun t tl -> t :: tl))
                      ) (Observable.Return<_>([]))
        ), 
        List.toArray)

Из-за вложенности вы можете столкнуться с проблемами производительности, если у вас большой список наблюдаемых, но стоит хотя бы попробовать, прежде чем прибегнуть к написанию этого вручную.

Для 2 я бы согласился с другими ответами, чтобы применить Throttling к результату.

1 голос
/ 19 августа 2011

Хотя Гидеон Энгельберт ответил на ваш вопрос одним из возможных способов решения проблемы. Другим возможным способом может быть что-то вроде ниже, он не использует вложенность.

let combineLatestToArray (list : IObservable<'T> list) = 
    let s = new Subject<'T array>()
    let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
    let cb (i:int,v:'T) = 
        arr.[i] <- v
        s.OnNext(arr |> Array.toList |> List.toArray)
    let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
               |> Observable.Merge
    main.Subscribe(new Action<int * 'T>(cb)
                   ,new Action<exn>(fun e -> s.OnError(e)) 
                   ,new Action(fun () -> s.OnCompleted()) ) |> ignore
    s :> IObservable<'T array>

Дайте мне знать, если это решило вашу проблему, так как я не слишком много ее тестировал :) ПРИМЕЧАНИЕ: это для первой части, для второй части все уже упоминали, что вам нужно сделать

UPDATE: Другая реализация:

let combineLatestToArray (list : IObservable<'T> list) = 
    let s = new Subject<'T array>()
    let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
    let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
               |> Observable.Merge
    async {
        try
            let se = main.ToEnumerable() |> Seq.scan (fun ar (i,t) -> Array.set ar i t; ar) arr
            for i in se do
                s.OnNext(i |> Array.toList |> List.toArray)
            s.OnCompleted()
        with
        | :? Exception as ex -> s.OnError(ex)
    } |> Async.Start
    s :> IObservable<'T array>
1 голос
/ 19 августа 2011

Мне жаль, что это не F # - хотелось бы, чтобы у меня было время, чтобы выучить его - но вот возможный ответ на C #.

Вот набор методов расширения, которые будут объединять последние от IEnumerable<IObservable<T>> до IObservable<IEnumerable<T>>:

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IObservable<T> second)
{
    if (first == null) throw new ArgumentNullException("first");
    if (second == null) throw new ArgumentNullException("second");
    return first.CombineLatest(second, (t0, t1) => EnumerableEx.Return(t0).Concat(EnumerableEx.Return(t1)));
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IObservable<T> second)
{
    if (firsts == null) throw new ArgumentNullException("firsts");
    if (second == null) throw new ArgumentNullException("second");
    return firsts.CombineLatest(second, (t0s, t1) => t0s.Concat(EnumerableEx.Return(t1)));
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources)
{
    if (sources == null) throw new ArgumentNullException("sources");
    return sources.CombineLatest(() => sources.First().CombineLatest(sources.Skip(1)), () => Observable.Empty<IEnumerable<T>>());
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IEnumerable<IObservable<T>> seconds)
{
    if (first == null) throw new ArgumentNullException("first");
    if (seconds == null) throw new ArgumentNullException("seconds");
    return seconds.CombineLatest(() => first.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => first.Select(t => EnumerableEx.Return(t)));
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IEnumerable<IObservable<T>> seconds)
{
    if (firsts == null) throw new ArgumentNullException("firsts");
    if (seconds == null) throw new ArgumentNullException("seconds");
    return seconds.CombineLatest(() => firsts.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => firsts);
}

private static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources, Func<IObservable<IEnumerable<T>>> any, Func<IObservable<IEnumerable<T>>> none)
{
    if (sources == null) throw new ArgumentNullException("sources");
    if (any == null) throw new ArgumentNullException("any");
    if (none == null) throw new ArgumentNullException("none");
    return Observable.Defer(() => sources.Any() ? any() : none());
}

Они могут быть не очень эффективными, но они обрабатывают любое количество наблюдаемых, которые необходимо объединить.

Мне бы очень хотелось, чтобы эти методы были преобразованы в F #.

Что касается вашего второго вопроса, я не уверен, что смогу ответить тем, что вы сказали до сих пор, потому что CombineLatest и Throttle оба теряют значения, так что, вероятно, разумнее понять ваш вариант использования более подробно, прежде чем пытаюсь ответить.

0 голосов
/ 19 августа 2011

Это лучшее, что я мог придумать. Я давно хотел решить эту проблему.

public static class Extensions
{
    public static IObservable<IEnumerable<T>> CombineLatest<T>(this Observable observable, IEnumerable<IObservable<T>> observableCollection)
    {
        return observableCollection.CombineLatest();
    }

    public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> observables)
    {
        return observables.Aggregate<IObservable<T>, IObservable<IEnumerable<T>>>
        (
            Observable.Return(Enumerable.Empty<T>()),
            (o, n) => o.CombineLatest
            (
                n,
                (list, t) => list.Concat(EnumerableEx.Return(t))
            )
        );
    }
}

Вот пример использования:

var obs = new List<IObservable<bool>> 
{ 
    Observable.Return(true), 
    Observable.Return(false), 
    Observable.Return(true) 
};

var result = obs.CombineLatest().Select(list => list.All(x => x));
result.Subscribe(Console.WriteLine);
Console.ReadKey();

Вы должны были бы оперировать знанием, однако, что полученное IObservable<IEnumerable<T>> не сработает, пока все наблюдаемые не дадут значение . Это то, что мне было нужно в моих сценариях, но может не подходить для вашего сценария.

Меня беспокоит это производительность всех .Concats. Может быть более производительным, чтобы иметь дело с изменяемой коллекцией в методе расширения. Точно сказать не могу.

Извините, я не знаю F #. Я вернусь к этому на днях.

Дросселирование выполняется оператором .Throttle после получения окончательной наблюдаемой.

Редактировать: Этот ответ является итеративным Ying к Рекурсивный Enigmativity Yang .

0 голосов
/ 18 августа 2011
  1. Кажется, что Observable.Merge() с перегрузками для переменного числа IObservables ближе к тому, что вы хотите.

  2. Observable.Buffer() с перегрузками по времени будет делать то, что вы хотите здесь. В ситуации «нет событий», Buffer будет по-прежнему OnNext () пустой список, позволяя вам реагировать на это положение.

...