Объединить большое количество наблюдаемых в новые наблюдаемые - PullRequest
6 голосов
/ 03 декабря 2010

У меня есть, скажем, 1000 наблюдаемых.Теперь я хочу объединить все события в новую наблюдаемую, которая запускает OnNext, как только все остальные отправили событие.Какой лучший способ сделать это с помощью Rx?

Обновление : Некоторые отличные отзывы на форуме Rx, особенно Дейва СекстонаОн показал, как создать метод расширения Zip, который принимает несколько наблюдаемых: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/daaa84db-b560-4eda-871e-e523098db20c/

1 Ответ

2 голосов
/ 03 декабря 2010

В F # есть MailboxProcessor ... Я бы использовал SynchronizationContext в C # для той же цели.Дайте мне несколько минут, и я напишу пример.

В сторону: вот мой код на F #, который делает нечто подобное ... Это будет значительно больше усилий, но все же выполнимо в C # с Rx.

open System.Diagnostics

let numWorkers = 20
let asyncDelay = 100

type MessageForMailbox =
   | DataMessage of AsyncReplyChannel<unit>
   | GetSummary of AsyncReplyChannel<unit>

let main =
   let actor =
      MailboxProcessor.Start( fun inbox ->
         let rec loop acc =
            async {
               let! message = inbox.Receive()
               match message with
               | DataMessage replyChannel -> replyChannel.Reply(); return! loop acc
               | GetSummary replyChannel -> replyChannel.Reply(); return! loop acc
            }

         loop 0 // seed for acc
      )

   let codeBlocks = [for i in 1..numWorkers -> 
                        async {
                           do! Async.Sleep asyncDelay
                           return! actor.PostAndAsyncReply DataMessage
                        } ]

   while true do
      printfn "Concurrent started..."
      let sw = new Stopwatch()
      sw.Start()
      codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore
      actor.PostAndReply GetSummary
      sw.Stop()
      printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * 100) / sw.ElapsedMilliseconds)

      printfn "Synchronous started..."
      let sw = new Stopwatch()
      sw.Start()
      for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore
      sw.Stop()
      printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100) / sw.ElapsedMilliseconds)

main
...