Может ли наблюдатель безопасно прослушивать несколько наблюдаемых с помощью Rx? - PullRequest
3 голосов
/ 13 декабря 2011

Я пытаюсь увидеть, как несколько наблюдаемых событий могут быть переданы в один набор событий с помощью Rx.Но я получаю исключение, когда запускаю приведенный ниже код.Значит ли это, что несколько наблюдателей всегда склонны к исключениям из-за нарушения грамматики Rx?Я имею в виду, что если два из этих нескольких наблюдателей генерируют событие в одно и то же время случайно (любые два наблюдаемых будут иметь некоторую вероятность создания одновременно), это должно дать исключение.

DateTimeOffset start;
        object sync = new object();
        var subject = new Subject<long>();
        var observer = Observer.Create<long>(c =>
        {
            lock (sync)
            {
                Console.WriteLine(c);
            }
        })
            ;

        var observable1 = Observable.Interval(TimeSpan.FromSeconds(2));
        var observable2 = Observable.Interval(TimeSpan.FromSeconds(5));
        var observable3 = Observable.Never<long>().Timeout
            (start = DateTimeOffset.Now.AddSeconds(15),
             (new long[] { 1 }).ToObservable());
        var observable4 = Observable.Never<long>().Timeout(start);
        observable1.Subscribe(observer);
        observable2.Subscribe(observer);
        observable3.Subscribe(observer);
        observable4.Subscribe(observer);
        Thread.Sleep(20000);

Спасибо Гедеону за объяснение.Это исключение, которое я получаю.Вы правы в том, что это время для исключения.Это была ошибка кодирования.Спасибо.

System.TimeoutException: The operation has timed out.
   at System.Reactive.Observer.<Create>b__8[T](Exception e)
   at System.Reactive.AnonymousObserver`1.Error(Exception exception)
   at System.Reactive.AbstractObserver`1.OnError(Exception error)
   at System.Reactive.Subjects.Subject`1.OnError(Exception error)
   at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e
xception)
   at System.Reactive.AbstractObserver`1.OnError(Exception error)
   at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e
xception)
   at System.Reactive.AbstractObserver`1.OnError(Exception error)
   at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<>c__DisplayClass28
e.<Throw>b__28b()
   at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
   at System.Reactive.Concurrency.ImmediateScheduler.Schedule[TState](TState sta
te, Func`3 action)
   at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio
n action)
   at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<Throw>b__28a(IObse
rver`1 observer)
   at System.Reactive.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0()

   at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
   at System.Reactive.Concurrency.ScheduledItem`2.InvokeCore()
   at System.Reactive.Concurrency.ScheduledItem`1.Invoke()
   at System.Reactive.Concurrency.CurrentThreadScheduler.Trampoline.Run()
   at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState
 state, TimeSpan dueTime, Func`3 action)
   at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState
 state, Func`3 action)
   at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio
n action)
   at System.Reactive.AnonymousObservable`1.Subscribe(IObserver`1 observer)
   at System.Reactive.Linq.Observable.<>c__DisplayClass543`1.<>c__DisplayClass54
5.<Timeout>b__53f()
   at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
   at System.Reactive.Concurrency.ThreadPoolScheduler.<>c__DisplayClass8`1.<Sche
dule>b__6(Object _)
   at System.Threading._TimerCallback.TimerCallback_Context(Object state)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, C
ontextCallback callback, Object state, Boolean ignoreSyncCtx)
   at System.Threading._TimerCallback.PerformTimerCallback(Object state)

Ответы [ 2 ]

5 голосов
/ 14 декабря 2011

Да, наблюдатель может слушать несколько наблюдаемых.Лучшим примером для этого будет оператор Merge.Все встроенные операторы будут следовать грамматике RX и часто применяют ее в источниках, которые этого не делают.

IObserver, полученный из Observer.Create, является одним из таких случаев.Он будет игнорировать любые будущие вызовы OnNext после вызова OnError или OnCompleted.Это означает, что использование одного и того же наблюдателя для подписки на одну наблюдаемую и затем на другую наблюдаемую после первой не будет работать, потому что сообщение о завершении из первой наблюдаемой заставит наблюдателя игнорировать сообщения от второй наблюдаемой.Чтобы обойти это, операторы типа Merge, Concat и OnErrorResumeNext (среди прочих) используют несколько наблюдателей внутренне и не передают сообщения о завершении (OnError и / или OnCompleted в зависимости от семантики оператора) из любого, кромепоследняя наблюдаемая для внешнего наблюдателя.

Вы не упомянули, какое исключение вы получаете, но я предполагаю, что это ошибка, возникающая из-за тайм-аута, который вы получаете из observable4.Если вы не предоставите другую наблюдаемую для использования в течение тайм-аута, вызывается OnError наблюдателя, и по умолчанию OnError для перегрузок Subscribe и Observer.Create, которые не принимают обработчик ошибок, - простоисключение.

Хотя это явно пример / тестирующий код, я хочу отметить, что даже если вы больше не получаете сообщения, передаваемые в OnNext, все другие наблюдаемые продолжают работать после этого исключения.Либо используйте Merge, чтобы отследить это для вас, либо отследить все расходные материалы из описания и утилизировать их самостоятельно, когда появится сообщение о завершении.CompositeDisposableSystem.Reactive.Disposables) хорошо для этого.

2 голосов
/ 17 декабря 2011

Вы действительно не должны использовать здесь замки, но если вы действительно хотите, чтобы это работало, вы можете сделать:

var x = Observable.Create<T>(subj => { /* Fill it in*/ })
    .Multicast(new Subject<T>());

// Set up your subscriptions Here!

// When you call the Connect, whatever is in the Observable.Create will be called
x.Connect();

Если вы хотите быть еще более безопасным, вы можете сделать так, чтобырезультат Create будет «воспроизведен» вам для будущих подписок, используя ReplaySubject вместо Subject (тогда как для Subject подписчики после Connect не получат ничего)

...