Обработка асинхронной очереди с реактивными расширениями - PullRequest
7 голосов
/ 30 июня 2011

Об этом есть пара статей, и у меня это работает ... но я хочу знать, как установить максимальное количество потоков задач для моих подписок Observable одновременно.

У меня есть следующее, чтобы распараллелить асинхронное сохранение записей журнала:

private BlockingCollection<ILogEntry> logEntryQueue;

и

 logEntryQueue = new BlockingCollection<ILogEntry>();
 logEntryQueue.GetConsumingEnumerable().ToObservable(Scheduler.TaskPool).Subscribe(SaveLogEntry);

Чтобы запланировать мое сохранение ... но как мне указать максимальное количество потоков, которое планировщик должен использовать одновременно?

Ответы [ 2 ]

9 голосов
/ 01 июля 2011

Это не функция наблюдаемого, а функция планировщика. Observable определяет what , а планировщик определяет где .

Вам нужно было бы передать пользовательский планировщик. Простой способ сделать это - создать подкласс TaskScheduler и переопределить свойство MaximumConcurrencyLevel.

http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler.maximumconcurrencylevel.aspx

Я действительно нашел образец этого в MSDN:

http://msdn.microsoft.com/en-us/library/ee789351.aspx

Редактировать: Вы спрашивали о том, как перейти от TaskScheduler к IScheduler. Другой разработчик только что дал мне немного информации:

var ischedulerForRx = new TaskPoolScheduler
(
    new TaskFactory
    (
        //This is your custom scheduler
        new LimitedConcurrencyLevelTaskScheduler(1)
    )
);
2 голосов
/ 02 июля 2011

Если вы создаете свою «работу» как IObservable<T> с отложенным выполнением (т. Е. Они хотят делать что-либо до подписки), вы можете использовать перегрузку Merge, которая принимает максимальное число одновременных подписок:

ISubject<QueueItem> synchronizedQueue = new Subject<QueueItem>().Synchronize();

queue
    .Select(item => StartWork(item))
    .Merge(maxConcurrent: 5) // C# 4 syntax for illustrative purposes
    .Subscribe();

// To enqueue:
synchronizedQueue.OnNext(new QueueItem());
...