Синхронизировать несколько подписок в RX - PullRequest
2 голосов
/ 01 января 2012

Можно ли принудительно заставить несколько подписок RX на разные наблюдаемые запускаться последовательно (не одновременно)?

Мне известно, что я могу использовать EventLoopScheduler для этого, но это снизит производительность, потому что вся обработка будет выполняться в одном потоке.

1 Ответ

1 голос
/ 01 января 2012

Если вы хотите запустить одну наблюдаемую до OnCompleted, а затем запустите следующую, вы должны изучить Concat. Если вы хотите иметь несколько разных наблюдаемых, которые подписаны одновременно, вы можете использовать Merge (если семантика имеет смысл для вашего сценария). Если слияние не подходит, я бы рекомендовал использовать один из стандартных методов синхронизации потоков (блокировка, монитор и т. Д.) В методах наблюдателя или EventLoopScheduler, о котором вы уже знаете.

РЕДАКТИРОВАТЬ Исходный ответ сохраняется ниже

Да, возможно принудительное выполнение последовательного наблюдателя. Однако, нужно вам или нет, зависит от наблюдаемого. В общем, горячие наблюдаемые уже будут работать последовательно, а холодные наблюдаемые - нет. Это побочный эффект разницы в том, как работают горячие и холодные наблюдаемые. Чтобы сделать наблюдаемый холод горячим и, таким образом, заставить наблюдателей работать последовательно, используйте Publish. Вот пример, демонстрирующий различные варианты поведения.

Sub Main()
    'hot observable, runs serially
    Dim trigger As New ObsEvent
    Dim eobs = Observable.FromEventPattern(Of ItemEventArgs(Of String))(
                    Sub(h) AddHandler trigger.Triggered, h,
                    Sub(h) RemoveHandler trigger.Triggered, h)
    Dim sub1 = eobs.Subscribe(Sub(v)
                                  Console.WriteLine("Starting event observer 1: {0}", v.EventArgs.Item)
                                  Thread.Sleep(2000)
                                  Console.WriteLine("Ending event observer 1")
                              End Sub)
    trigger.Trigger("event trigger 1")
    Dim sub2 = eobs.Subscribe(Sub(v)
                                  Console.WriteLine("Starting event observer 2: {0}", v.EventArgs.Item)
                                  Thread.Sleep(2000)
                                  Console.WriteLine("Ending event observer 2")
                              End Sub)
    trigger.Trigger("event trigger 2")

    Console.WriteLine()
    Console.WriteLine()

    'cold observable, runs "simultaneously"
    Dim tobs = Observable.Timer(TimeSpan.FromSeconds(5))
    sub1 = tobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting timer observer 1")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending timer observer 1")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer 1 completed"))
    Thread.Sleep(500)
    sub2 = tobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting timer observer 2")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending timer observer 2")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer 2 completed"))

    'cold observable turned hot, runs serially
    Dim pobs = tobs.Publish()
    sub1 = pobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting publish observer 1")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending publish observer 1")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer P1 completed"))
    Thread.Sleep(500)
    sub2 = pobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting publish observer 2")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending publish observer 2")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer P2 completed"))
    pobs.Connect()

    Console.ReadKey()
End Sub
...