Streaming Data BlockingCollection - PullRequest
       8

Streaming Data BlockingCollection

4 голосов
/ 28 января 2012

На странице 88 книги Стивена Туба

http://www.microsoft.com/download/en/details.aspx?id=19222

есть код

private BlockingCollection<T> _streamingData = new BlockingCollection<T>();
// Parallel.ForEach
Parallel.ForEach(_streamingData.GetConsumingEnumerable(),
item => Process(item));
// PLINQ
var q = from item in _streamingData.GetConsumingEnumerable().AsParallel()
...
select item;

Стивен затем упоминает

", когда передача результата вызова GetConsumingEnumerable в качестве источника данных в Parallel.ForEach, потоки, используемые цикл может блокироваться, когда коллекция становится пустой. И заблокированный поток не может быть освобожден Parallel.ForEach обратно в ThreadPool для удаления или другого использования. Таким образом, с кодом, как показано выше, если есть какие-либо периоды времени, когда коллекция пуста, число потоков в процессе может стабильно расти; "

Я не понимаю, почему количество потоков будет расти?

Если коллекция пуста, не будет ли blockingcollection не запрашивать дальнейшие потоки?

Следовательно, вам не нужно делать WithDegreeOfParallelism, чтобы ограничить количество потоков, используемых в BlockingCollection

1 Ответ

3 голосов
/ 28 января 2012

В пуле потоков есть алгоритм восхождения на холм, который он использует для оценки соответствующего количества потоков. Пока добавление потоков увеличивает пропускную способность, пул потоков будет создавать больше потоков. Он будет предполагать, что происходит некоторая блокировка или ввод-вывод, и пытается насыщать ЦП, пропуская количество процессоров в системе.

Именно поэтому выполнение операций ввода-вывода и блокировка чего-либо в потоках пула потоков может быть опасным.

Вот полностью рабочий пример такого поведения:

        BlockingCollection<string> _streamingData = new BlockingCollection<string>();

        Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 100; i++)
                {
                    _streamingData.Add(i.ToString());
                    Thread.Sleep(100);
                }
            });

        new Thread(() =>
            {
                while (true)
                {
                    Thread.Sleep(1000);
                    Console.WriteLine("Thread count: " + Process.GetCurrentProcess().Threads.Count);
                }
            }).Start();

        Parallel.ForEach(_streamingData.GetConsumingEnumerable(), item =>
            {
            });

Я не знаю, почему количество потоков продолжает расти, хотя и не увеличивает пропускную способность. Согласно модели, которую я объяснил, она не будет расти. Но я не знаю, правильна ли моя модель.

Возможно, у пула потоков есть дополнительная эвристика, которая делает его порождающим потоки, если он вообще не видит прогресса (измеряется в задачах, выполняемых в секунду). Это имело бы смысл, потому что это, вероятно, предотвратило бы много взаимоблокировок в приложениях. Блокировки могут возникнуть, если важные задачи не могут быть запущены, поскольку они ожидают выхода существующих задач и обеспечения доступности потоков. Это хорошо известная проблема с пулом потоков.

...