Масштабируемость за счет экономии потоков: асинхронные операции и многопоточные очереди производителей / потребителей в пуле потоков? - PullRequest
3 голосов
/ 27 января 2012

Асинхронное программирование - это способ достижения масштабируемости на веб-серверах за счет экономии потоков, так что очень немногие неблокирующие потоки могут обрабатывать много одновременных запросов. Например, Node.js обеспечивает масштабируемость, используя только один поток, используя асинхронные операции.

В настоящее время я использую базу данных MongoDb и ее официальный драйвер C #, который еще не поддерживает асинхронные операции. Поэтому я рассматриваю возможность использования простой очереди производителя / потребителя для обработки запросов mongodb, чтобы уменьшить количество блокирующих потоков. Это достигается тем, что потоки пула потоков вставляют запросы db в очередь, а затем позволяют им продолжать выполнение других задач. В очереди есть еще один выделенный поток, выполняющий фактические запросы db, и когда запросы возвращаются с результатами, результат передается потоку пула потоков.

Однако мне теперь интересно, нужно ли использовать очередь при использовании пула потоков (через TPL и задачи из c # 4.0), поскольку у пула потоков есть максимальное ограничение на количество потоков. Когда этот предел достигнут, запросы помещаются в очередь, пока потоки пула потоков не станут доступными. Так звучит ли так, как если бы пул потоков предоставлял функции очереди из коробки, и поэтому ничего не было бы получено при использовании моей собственной очереди или как?

Еще одна вещь, которая меня интересует, это следующий комментарий от превосходной книги "C # 4.0 в двух словах", страница. 928: «Существует исключение из правила« не блокировать ». Обычно нормально блокировать при вызове сервера базы данных - если другие потоки конкурируют за один и тот же сервер. Это потому, что в системе с высокой степенью параллелизма база данных должна быть спроектирована так что большинство запросов выполняются чрезвычайно быстро. Если в результате вы получите тысячи одновременных запросов, это означает, что запросы попадают в базу данных быстрее, чем они могут их обработать. Экономия потоков - это наименьшее из ваших беспокойств. "

Я не могу понять, почему можно блокировать запрос db по сравнению с блокировкой других объектов, таких как запросы к другим серверам. Не лучше ли НЕ блокировать запросы к базе данных, чтобы поток был освобожден для обслуживания других запросов, которые могут не нуждаться в доступе к БД.

Подводя итог: можно ли добиться экономии потоков, полагаясь на максимальное количество потоков пула потоков, или было бы лучше создать простую очередь потребителя-производителя, и почему нормально блокировать вызовы на серверы баз данных?

Ответы [ 2 ]

4 голосов
/ 27 января 2012

Нельзя блокировать потоки TP в запросах базы данных. Фраза в кавычках гласит, что это нормально, если все потоков TP блокируют такие запросы. С этим не поспоришь, но это выглядит довольно искусственно.

Основная задача диспетчера потоков состоит в том, чтобы он никогда не выполнял больше потоков, чем имеется в доступных ядрах на машине. Поскольку многопоточность неэффективна, переключение контекста между потоками довольно дорого. Это, однако, не будет работать очень хорошо, если выполняющийся поток TP блокирует и не выполняет никакой реальной работы. Менеджер TP не настолько умен, чтобы знать , что поток TP блокирует, и не может предсказать, как долго он будет блокироваться. Только движок dbase может догадаться об этом, и он не говорит.

Таким образом, у менеджера TP есть простой алгоритм для решения этой проблемы: если ни один из выполняющихся потоков TP не завершает работу в течение разумного времени, то он позволяет запустить другой. Теперь у вас есть больше активных потоков, чем у процессорных ядер. «Разумное время» для диспетчера .NET TP составляет полсекунды.

Это продолжается при необходимости, дополнительные потоки могут работать, пока существующие застряли в колее.

На самом деле получение количества потоков ThreadPool.GetMaxThreads () невероятно вредно для здоровья. Это огромное число, в 250 раз превышающее количество ядер в машине. На 4-ядерном компьютере, чтобы достичь максимума, потребуется 999 исполняющихся потоков, которые не достигли прогресса в течение 499 секунд. Эти потоки будут использовать для своих стеков классное гигабайт адресного пространства. Вы далеко за пределами наблюдения "здесь что-то не так", если оно когда-либо попадет туда.

В этом ответе есть несколько простых количественных чисел. Как только операция начинает занимать более полсекунды, вам нужно подумать о том, чтобы выполнить ее в выделенном потоке. Запись на полсекунды действительно возможна только при блокировке, поэтому поток гораздо более уместен, чем поток TP. И да, используйте потокобезопасную очередь, чтобы заполнить ее запросами операций. Также важно, чтобы вы установили верхний предел количества ожидающих запросов, чтобы не затопить очередь. Дросселировать производителя путем блокировки. И, конечно же, не забывайте, что будет через год. Базы данных никогда не становятся быстрее, когда они стареют.

0 голосов
/ 27 января 2012

Вот очередь, использующая пул потоков для обратных вызовов, и с верхним пределом очереди, как было предложено (еще не проверено). Я думаю, что это сработает, или есть ли лучшие или более простые альтернативы?

public class CallbackQueueItem<T> {
    public Func<T> Func { get; set; }
    public Action<object> Callback { get; set; }
}

public class CallbackQueue<T> {
    private readonly BlockingCollection<CallbackQueueItem<T>> _items;

    public CallbackQueue(int upperLimit) {
        _items = new BlockingCollection<CallbackQueueItem<T>>(upperLimit);            
    }
    private BlockingCollection<CallbackQueueItem<T>> Items {
        get { return _items; }
    }
    public void Start()
    {
        Task.Factory.StartNew(() => {
            while(!Items.IsCompleted) {
                CallbackQueueItem<T> item;
                try {
                    item = Items.Take();
                }
                catch(InvalidOperationException) {
                    break;
                }
                if(item != null) {
                    var result = item.Func();
                    Task.Factory.StartNew(item.Callback,result);
                }
            }
        });
    }

    public void Stop() {
        Items.CompleteAdding();
    }

    public void Push(CallbackQueueItem<T> item) {
        Items.Add(item);
    }
}
...