Ограничить количество потоков - PullRequest
5 голосов
/ 26 июля 2010

У меня есть список товаров, которые я хочу загрузить. Я использую цикл Loop для итерации списка.

Для каждого элемента в этом списке я создаю новую тему, которая ссылается на этот элемент. Моя проблема в том, что я хочу одновременно ограничить максимальную загрузку.

for (int i = downloadList.Count - 1; i >= 0; i--)
{
    downloadItem item = downloadList[i];
    if (item.Status != 1 && item.Status != 2)
    {
        ThreadStart starter = delegate { this.DownloadItem(ref item); };
        Thread t = new Thread(starter);
        t.IsBackground = true;
        t.Name = item.Name;
        t.Priority = ThreadPriority.Normal;
        t.Start();
    }
}

Я прочитал кое-что о ThreadPool, но потом не могу сослаться на свой предмет. Кто-нибудь может мне помочь? Спасибо! :)

Edit:

Я проверял это:

ThreadPool.SetMaxThreads(maxDownloads, maxDownloads);
ThreadPool.SetMinThreads(maxDownloads, maxDownloads);
ThreadPool.QueueUserWorkItem(DownloadItem, ref item);

Я не знаю, как я могу сослаться на свой элемент загрузки с этой темой .....

Ответы [ 4 ]

9 голосов
/ 26 июля 2010

, если вы используете .NET 4, я настоятельно рекомендую использовать Parallel.ForEach (возможно, в downloadList.Reverse ())

так, что-то вроде:

Parallel.ForEach(downloadList.Reverse(), 
                 new ParallelOptions { MaxDegreeOfParallelism = 8 },
                 item => this.DownloadItem(item));

Если вы не хотите, чтобы вызывающий поток блокировал, вы можете, конечно, QueueUserWorkItem этот вызов.

5 голосов
/ 26 июля 2010

Я решил эту проблему в .Net 3.5, создав потоки и загрузив их в очередь. Затем я читаю поток из очереди, запускаю его и увеличиваю количество запущенных потоков. Я продолжаю делать это, пока не достигну верхнего предела.

Когда каждый поток завершает свою работу, он вызывает метод обратного вызова, который уменьшает количество выполненных операций и сигнализирует программе чтения очереди запускать больше потоков. Для дополнительного контроля вы можете использовать словарь для отслеживания запущенных потоков с ключом ManagedThreadId, чтобы вы могли сигнализировать потокам о прекращении работы раньше или сообщать о прогрессе.

Пример консольного приложения:

using System;
using System.Collections.Generic;
using System.Threading;

namespace ThreadTest
{
    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supervisor = new Supervisor();
            supervisor.LaunchThreads();
            Console.ReadLine();
            supervisor.KillActiveThreads();
            Console.ReadLine();
        }

        public delegate void WorkerCallbackDelegate(int threadIdArg);
        public static object locker = new object();

        class Supervisor
        {
            Queue<Thread> pendingThreads = new Queue<Thread>();
            Dictionary<int, Worker> activeWorkers = new Dictionary<int, Worker>();

            public void LaunchThreads()
            {
                for (int i = 0; i < 20; i++)
                {
                    Worker worker = new Worker();
                    worker.DoneCallBack = new WorkerCallbackDelegate(WorkerCallback);
                    Thread thread = new Thread(worker.DoWork);
                    thread.IsBackground = true;
                    thread.Start();
                    lock (locker)
                    {
                        activeWorkers.Add(thread.ManagedThreadId, worker);
                    }
                }
            }

            public void KillActiveThreads()
            {
                lock (locker)
                {
                    foreach (Worker worker in activeWorkers.Values)
                    {
                        worker.StopWork();
                    }
                }
            }

            public void WorkerCallback(int threadIdArg)
            {
                lock (locker)
                {
                    activeWorkers.Remove(threadIdArg);
                    if (activeWorkers.Count == 0)
                    {
                        Console.WriteLine("no more active threads");
                    }
                }
            }
        }

        class Worker
        {
            public WorkerCallbackDelegate DoneCallBack { get; set; }
            volatile bool quitEarly;

            public void DoWork()
            {
                quitEarly = false;
                Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " started");
                DateTime startTime = DateTime.Now;
                while (!quitEarly && ((DateTime.Now - startTime).TotalSeconds < new Random().Next(1, 10)))
                {
                    Thread.Sleep(1000);
                }
                Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " stopped");
                DoneCallBack(Thread.CurrentThread.ManagedThreadId);
            }

            public void StopWork()
            {
                quitEarly = true;
            }
        }
    }
}
1 голос
/ 28 сентября 2010

Лучший способ справиться с этим - создать только maxDownloads количество потоков. Поместите все свои рабочие элементы в очередь и позвольте потокам конкурировать друг с другом, чтобы выяснить, какой из них обрабатывает каждый рабочий элемент.

var queue = new ConcurrentQueue<downloadItem>(downloadList);
for (int i = 0; i < Math.Min(maxDownloads, queue.Count))
{
  var thread = new Thread(
    () =>
    {
      while (true)
      {
        downloadItem item = null;
        if (queue.TryDequeue(out item))
        {
          // Process the next work item.
          DownloadItem(item);
        }
        else
        {
          // No more work items are left.
          break;
        }
      }
    });
    thread.IsBackground = true;
    thread.Start();
}

Вы также можете использовать семафор для регулирования количества потоков, обрабатывающих рабочие элементы. Это особенно полезно, когда фактическое количество потоков неизвестно, как в случае, если вы использовали ThreadPool.

var semaphore = new Semaphore(maxDownloads, maxDownloads);
for (int i = 0; i < downloadList.Count; i++)
{
  downloadItem item = downloadList[i];
  ThreadPool.QueueUserWorkItem(
    (state) =>
    {
      semaphore.WaitOne();
      try
      {
        DownloadItem(item);
      }
      finally
      {
        semaphore.Release();
      }
    });
}

Я не особенно люблю оба подхода. Проблема с первым заключается в том, что создается нефиксированное количество потоков. Как правило, рекомендуется избегать создания потоков в цикле for, поскольку это имеет тенденцию плохо масштабироваться. Проблема со вторым состоит в том, что семафор заблокирует некоторые из потоков ThreadPool. Это также не рекомендуется, потому что вы эффективно запрашиваете один из потоков, а затем ничего не делаете с ним. Это может повлиять на выполнение других не связанных задач, которые совместно используют ThreadPool. Я думаю, что в этом случае любой из двух вариантов подойдет, поскольку создание более масштабируемого шаблона - это больше работы, чем оно того стоит.

0 голосов
/ 26 июля 2010

Я не понимаю, почему вы все равно пытаетесь использовать ключевое слово ref.Объекты передаются по ссылке в C # по умолчанию, и в исходном коде вы не используете item после того, как оно передано DownloadItem.Поэтому я бы предложил использовать ThreadPool методы, которые вы попробовали, но не использовать ref параметр.

Надеюсь, что это поможет.

...