Понять поток управления при вызове кода блокировки из неблокирующего блока? - PullRequest
2 голосов
/ 21 января 2012

У меня есть следующий код

 static void Main(string[] args)
        {
            //var source = BlockingMethod();
            var source2 = NonBlocking();
            source2.Subscribe(Console.WriteLine);
            //source.Subscribe(Console.WriteLine);
            Console.ReadLine();

        }
            private static IObservable<string> BlockingMethod()
            {
              var subject = new ReplaySubject<string>();
              subject.OnNext("a");
              subject.OnNext("b");
              subject.OnCompleted();
              Thread.Sleep(1000);
              return subject;
            }
            private static IObservable<string> NonBlocking()
            {
                return Observable.Create<string>(
                    observable =>
                        {
                            observable.OnNext("c");
                            observable.OnNext("d");
                            observable.OnCompleted();
                            //Thread.Sleep(1000);

                            var source = BlockingMethod();
                            source.Subscribe(Console.WriteLine);

                            return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
                            //or can return an Action like
                            //return () => Console.WriteLine("Observer has unsubscribed");
                        });
            }
        }

, который печатает

c
d
Observer has unsubscribed
a
b

Может кто-нибудь помочь мне получить поток управления в программе.Я пытался читать стек вызовов и т. Д., Но не мог понять все.

РЕДАКТИРОВАТЬ Почему я получаю вышеприведенный вывод (который я считаю правильным) вместо

 c 
 d 
 a 
 b 
 Observer has unsubscribed

1 Ответ

2 голосов
/ 22 января 2012

Разница между вашим ожидаемым поведением и фактическим поведением происходит из следующей строки:

var subject = new ReplaySubject<string>();

По умолчанию ReplaySubject использует Scheduler.CurrentThread. Это как если бы вы объявили это так:

var subject = new ReplaySubject<string>(Scheduler.CurrentThread);

При планировании с использованием текущего потока вы ставите свои действия в очередь - ожидая завершения выполняющегося в данный момент кода до того, как запустится. Если вы хотите, чтобы код запускался немедленно, вам нужно использовать Scheduler.Immediate примерно так:

var subject = new ReplaySubject<string>(Scheduler.Immediate);

Достаточно ли это объяснить?

...