Привет. Я пытаюсь объединить несколько наблюдаемых в наблюдаемый массив. Вот пример, который работает в ФСИ. (извините, что это долго)
#r "./bin/Debug/System.Reactive.dll"
open System
open System.Reactive.Linq
/// Subscribes to the Observable with all 3 callbacks.
let subscribeComplete next error completed (observable: IObservable<'T>) =
observable.Subscribe(
(fun x -> next x),
(fun e -> error e),
(fun () -> completed()))
/// Subscribes to the Observable with a next and an error-function.
let subscribeWithError next error observable =
subscribeComplete next error (fun () -> ()) observable
/// Subscribes to the Observable with a next-function
let subscribe (next: 'T -> unit) (observable: IObservable<'T>) : IDisposable =
subscribeWithError next ignore observable
/// Static method to generate observable from input functions
let ObsGenerate (initState: 'TS) (termCond: 'TS -> bool) (iterStep: 'TS -> 'TS)
(resSelect: 'TS -> 'TR) (timeSelect : 'TS -> System.TimeSpan) =
Observable.Generate(initState, termCond, iterStep, resSelect, timeSelect)
//maps the given observable with the given function
let obsMap (f: 'T -> 'U) (observable : IObservable<'T>) : IObservable<'U> =
Observable.Select(observable, Func<_,_>(f))
/// Merges two observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest (obs1: IObservable<'T>) (obs2: IObservable<'U>) : IObservable<'T * 'U> =
Observable.CombineLatest(
obs1, obs2, Func<_,_,_>(fun a b -> a, b))
/// Merges three observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest3 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) : IObservable<'T * 'U * 'V> =
let obs12 =obs1.CombineLatest(obs2, Func<_,_,_>(fun a b -> a, b))
obs12.CombineLatest(obs3, Func<_,_,_>(fun (a,b) c -> a, b, c))
/// Merges four observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest4 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) (obs4: IObservable<'W>) : IObservable<'T * 'U * 'V * 'W> =
let obsNew = combineLatest3 obs1 obs2 obs3
obsNew.CombineLatest(obs4, Func<_,_,_>(fun (a,b,c) d -> a, b, c, d))
// second section generating arrays
let combineLatestArray (obs1: IObservable<'T>) (obs2: IObservable<'T>) =
combineLatest obs1 obs2
|> obsMap (fun (a, b) -> [a; b] |> List.toArray)
let combineLatest3Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) =
combineLatest3 obs1 obs2 obs3
|> obsMap (fun (a, b, c) -> [a; b; c] |> List.toArray)
let combineLatest4Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) (obs4: IObservable<'T>) =
combineLatest4 obs1 obs2 obs3 obs4
|> obsMap (fun (a, b, c, d) -> [a; b; c; d] |> List.toArray)
let combineLatestListToArray (list: IObservable<'T> List) =
match list.Length with
| 2 -> combineLatestArray list.[0] list.[1]
| 3 -> combineLatest3Array list.[0] list.[1] list.[2]
| 4 -> combineLatest4Array list.[0] list.[1] list.[2] list.[3]
| _ -> failwith "combine latest on unsupported list size"
type FooType =
{ NameVal : string
IdVal : int
RetVal : float }
member x.StringKey() =
x.NameVal.ToString() + ";" + x.IdVal.ToString()
// example code starts here
let rnd = System.Random()
let fooListeners = Collections.Generic.Dictionary()
let AddAFoo (foo : FooType) =
let fooId = foo.StringKey()
if fooListeners.ContainsKey(fooId)
then fooListeners.[fooId]
else
let myObs = ObsGenerate {NameVal = foo.NameVal; IdVal = foo.IdVal; RetVal = foo.RetVal} (fun x -> true) (fun x -> {NameVal = (x.NameVal); IdVal = (x.IdVal); RetVal = (x.RetVal + rnd.NextDouble() - 0.5)}) (fun x -> x) (fun x -> System.TimeSpan.FromMilliseconds(rnd.NextDouble() * 2000.0))
fooListeners.Add(fooId,myObs)
myObs
let fooInit = [6..9]
|> List.map (fun index -> {NameVal = (string index + "st"); IdVal = index; RetVal = (float index + 1.0)})
|> List.map (fun foo -> AddAFoo foo)
let fooValuesArray = fooInit
|> List.map(fun x -> (x |> obsMap (fun x -> x.RetVal)))
|> combineLatestListToArray
let mySub =
fooValuesArray
|> subscribe (fun fooVals -> printfn "fooArray: %A" fooVals)
//execute until here to start example
// execute this last line to unsubscribe
mySub.Dispose()
У меня есть два вопроса об этом коде:
Есть ли более разумный способ объединения наблюдаемых в массивы? (это становится очень длинным, поскольку мне нужно объединить большие массивы)
Я хочу ограничить обновления. Под этим я подразумеваю, что я хочу, чтобы все обновления, которые происходят внутри (скажем) одной и той же половины секунды, обрабатывались как одно обновление массива. В идеале, я хочу, чтобы это окно открывалось только при поступлении первого обновления, то есть, если в течение 2 секунд не поступает никаких обновлений, то приходит одно обновление, затем мы ждем и включаем дополнительные обновления в течение 0,5 секунд, а затем запускаем наблюдаемое. Я не хочу, чтобы он публиковался периодически каждые 0,5 секунды, хотя никаких видимых событий не происходит. Надеюсь, это описание достаточно понятно.
обновление: я решил принять один из ответов F #, но я еще не оправдал ответы C #. Я надеюсь, что скоро смогу их проверить.