Почему каждый делегат наблюдения работает в новом потоке - PullRequest
3 голосов
/ 10 февраля 2012

В Rx при использовании Scheduler.NewThread для метода ObserveOn, в чем преимущество того, что каждый делегат Observation (OnNext) работает в новом потоке, когда Rx уже гарантирует, что OnNexts никогда не будет перекрываться. Если каждый OnNext будет вызываться один за другим, зачем нужен новый поток для каждого из них.

Я понимаю, почему нужно запускать делегаты наблюдения в потоке, отличном от потока подписки и приложения, но запускать каждый делегат наблюдения в новом потоке, когда они никогда не будут работать параллельно? .... не имеет смысла я или я что-то здесь упускаю?

Например

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

namespace RxTesting
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId);

            var numbers = from number in Enumerable.Range(1,10) select Process(number);

            var observableNumbers = numbers.ToObservable()
                .ObserveOn(Scheduler.NewThread)
                .SubscribeOn(Scheduler.NewThread);

            observableNumbers.Subscribe(
                n => Console.WriteLine("Consuming : {0} \t on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId));

            Console.ReadKey();
        }

        private static int Process(int number)
        {
            Thread.Sleep(500);
            Console.WriteLine("Producing : {0} \t on Thread : {1}", number,
                              Thread.CurrentThread.ManagedThreadId);

            return number;
        }
    }
}

Приведенный выше код дает следующий результат. Обратите внимание, что потребление выполняется в новом потоке каждый раз.

Application Thread : 8
Producing : 1    on Thread : 9
Consuming : 1    on Thread : 10
Producing : 2    on Thread : 9
Consuming : 2    on Thread : 11
Producing : 3    on Thread : 9
Consuming : 3    on Thread : 12
Producing : 4    on Thread : 9
Consuming : 4    on Thread : 13
Producing : 5    on Thread : 9
Consuming : 5    on Thread : 14
Producing : 6    on Thread : 9
Consuming : 6    on Thread : 15
Producing : 7    on Thread : 9
Consuming : 7    on Thread : 16
Producing : 8    on Thread : 9
Consuming : 8    on Thread : 17
Producing : 9    on Thread : 9
Consuming : 9    on Thread : 18
Producing : 10   on Thread : 9
Consuming : 10   on Thread : 19

Ответы [ 2 ]

2 голосов
/ 10 февраля 2012

Планировщик NewThread полезен для постоянных подписчиков.Если вы не укажете планировщик, производитель блокируется, ожидая завершения подписчиков.Часто вы можете использовать Scheduler.ThreadPool, но если вы ожидаете, что у вас будет много много долгосрочных задач, вам не захочется забивать ими пул потоков (поскольку он может использоваться не только подписчиками одной наблюдаемой).

Например, рассмотрите следующую модификацию на вашем примере.Я перенес задержку на подписчика и добавил индикацию готовности основного потока для ввода с клавиатуры.Обратите внимание на разницу, когда вы раскомментируете строки NewThead.

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

namespace RxTesting
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId);

            var numbers = from number in Enumerable.Range(1, 10) select Process(number);

            var observableNumbers = numbers.ToObservable()
//              .ObserveOn(Scheduler.NewThread)
//              .SubscribeOn(Scheduler.NewThread)
            ;

            observableNumbers.Subscribe(
                n => {
                    Thread.Sleep(500);
                    Console.WriteLine("Consuming : {0} \t on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId);
                });

            Console.WriteLine("Waiting for keyboard");
            Console.ReadKey();
        }

        private static int Process(int number)
        {
            Console.WriteLine("Producing : {0} \t on Thread : {1}", number,
                              Thread.CurrentThread.ManagedThreadId);

            return number;
        }
    }
}

Так почему же Rx не оптимизирует использование одного потока для каждого подписчика?Если подписчики работают так долго, что вам нужен новый поток, накладные расходы на создание потока в любом случае будут незначительными.Единственное исключение состоит в том, что если большинство подписчиков короткие, а некоторые работают долго, то оптимизация для повторного использования одного и того же потока действительно будет полезна.

0 голосов
/ 06 марта 2012

Я не уверен, что вы заметили, но если потребители медленнее, чем производители (например, если вы добавите более длительный режим сна в действие подписки), они будут использовать один и тот же поток, так что это может быть механизм обеспечения того, чтобы подписчики потребляют контент сразу после его публикации.

namespace RxTesting
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId);

            var numbers = from number in Enumerable.Range(1,10) select Process(number);

            var observableNumbers = numbers.ToObservable()
                .ObserveOn(Scheduler.NewThread)
                .SubscribeOn(Scheduler.NewThread);

            observableNumbers.Subscribe(
                n => 
                    {
                        Console.WriteLine("Consuming : {0} \t on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId);
                        Thread.Sleep(600);
                    }
                        );

            Console.ReadKey();
        }

        private static int Process(int number)
        {
            Thread.Sleep(500);
            Console.WriteLine("Producing : {0} \t on Thread : {1}", number,
                              Thread.CurrentThread.ManagedThreadId);

            return number;
        }
    }
}

Выходы:

Application Thread : 1
Producing : 1    on Thread : 3
Consuming : 1    on Thread : 4
Producing : 2    on Thread : 3
Consuming : 2    on Thread : 4
Producing : 3    on Thread : 3
Consuming : 3    on Thread : 4
Producing : 4    on Thread : 3
Consuming : 4    on Thread : 4
Producing : 5    on Thread : 3
Consuming : 5    on Thread : 4
Producing : 6    on Thread : 3
Consuming : 6    on Thread : 4
Producing : 7    on Thread : 3
Producing : 8    on Thread : 3
Consuming : 7    on Thread : 4
Producing : 9    on Thread : 3
Consuming : 8    on Thread : 4
Producing : 10   on Thread : 3
Consuming : 9    on Thread : 4
Consuming : 10   on Thread : 4
...