ObserveOn с Scheduler.NewThread не наблюдает так, если OnNext наблюдателя заблокирован и продолжен - PullRequest
2 голосов
/ 21 мая 2011

Может кто-нибудь помочь объяснить, почему, когда я "блокирую и продолжаю" последовательность onNext наблюдателя, подписанную на буфер с наблюдаемой во времени последовательностью, этот Scheduler.NewThread больше не применяется?

Например:

Если я буферизую последовательность чисел через

var query = from number in Enumerable.Range(1,200)
            select SnoozeNumberProduction(number);

var observableQuery = query.ToObservable();
var bufferedSequence = observableQuery.Buffer(TimeSpan.FromSeconds(2));

Где SnoozeNumberProduction задерживает генерацию номера на 250 мс

static int SnoozeNumberProduction(Int32 number)
{
    Thread.Sleep(250);
    return number;
}

Теперь позже, если я подпишусь на bufferedSequence с помощью «ObserveOn (Scheduler.NewThread)», чтобы я блокировал четвертый буфер с помощью Console.ReadKey

Random random = new Random();
Int32 count = 0;
bufferedSequence.ObserveOn(Scheduler.NewThread).Subscribe(list =>
{
    Console.WriteLine("({0}) Numbers from {1}-{2} produced on Thread ID {3}", list.Count, list[0], list[list.Count -1], Thread.CurrentThread.ManagedThreadId);

    Thread.Sleep(1000);
    count++;
    if (count == 4)
    {
        Console.WriteLine("count reached to 4, blocking ... press any key to continue ");
        Console.ReadKey(); // Block and build up the queue
    }

    Console.WriteLine("Woken " + list[0] + " - " + list[list.Count - 1]);
});

В этом случае, если я нажимаю какую-либо клавишу примерно через 10 секунд или около того, я вижу, что следующие несколько буферов выполняются на том же ManagedThread, даже если Scheduler.NewThread упоминается в ObserveOn. Может кто-нибудь помочь объяснить это поведение?

Пример вывода:

(7) Numbers from 1-7 produced on Thread ID 12
Woken 1 - 7
(9) Numbers from 8-16 produced on Thread ID 14
Woken 8 - 16
(8) Numbers from 17-24 produced on Thread ID 15
Woken 17 - 24
(8) Numbers from 25-32 produced on Thread ID 16
count reached to 4, blocking ... press any key to continue
Woken 25 - 32
(8) Numbers from 33-40 produced on Thread ID **16**
Woken 33 - 40
(8) Numbers from 41-48 produced on Thread ID **16**
Woken 41 - 48
(8) Numbers from 49-56 produced on Thread ID **16**
Woken 49 - 56
(8) Numbers from 57-64 produced on Thread ID **16**
Woken 57 - 64
(8) Numbers from 65-72 produced on Thread ID **16**
Woken 65 - 72
(8) Numbers from 73-80 produced on Thread ID **16**
Woken 73 - 80
(8) Numbers from 81-88 produced on Thread ID **16**
Woken 81 - 88
(8) Numbers from 89-96 produced on Thread ID **16**

Ответы [ 2 ]

2 голосов
/ 21 мая 2011

ObserveOn сам по себе является слоем в составленной последовательности, единственной задачей которого является переключение на другой планировщик. Тем не менее, ваши сны происходят в Select, происходящем в IEnumerable. Затем эта последовательность преобразуется в IObservable с использованием ToObservable, по умолчанию Dispatcher.CurrentThread.

Только в этот момент вы переключаетесь на другой поток для каждого входящего элемента. Если вы измените его на:

var query = from number in Enumerable.Range(1,200).ToObservable(Dispatcher.NewThread)
            select SnoozeNumberProduction(number);

var bufferedSequence = query.Buffer(TimeSpan.FromSeconds(2));

Теперь перечисление происходит в новом потоке, и, поскольку вы ничего не делаете для его изменения, оно останется там.

На самом деле Observable.Range начинается с IObservable и принимает необязательный IDispatcher. Тем не менее, я предположил, что ваш источник на самом деле не Enumerable.Range. Если это так, вот эквивалент:

var query = from number in Observable.Range(1,200, Dispatcher.NewThread)
            select SnoozeNumberProduction(number);

var bufferedSequence = query.Buffer(TimeSpan.FromSeconds(2));
0 голосов
/ 23 мая 2011

Я перепостил этот вопрос на форуме MSDN Rx http://social.msdn.microsoft.com/Forums/en-US/rx/thread/52e72a11-9841-4571-b86d-f805d3aeb7b5 и узнал, что это из соображений эффективности


Вы блокируете вызов OnNext в подписке. Оператор ObserveOn гарантирует, что OnNext будет вызываться как можно больше раз в текущем потоке. Оператор ObserveOn повторно использует текущий поток для последовательного вызова OnNext для стольких значений, сколько доступно в данный момент. Поскольку вы блокируете подписку, накапливается несколько вызовов OnNext. После разблокировки вызовы в очереди выполняются в том же потоке. Я полагаю, что это позволяет избежать накладных расходов на создание нового потока для уведомления, когда это не нужно.

...