Как я могу постоянно QueueUserWorkItems, но без очереди их всех сразу? - PullRequest
3 голосов
/ 10 сентября 2009

Я работаю над многопоточным шабером для веб-сайта, и по другому вопросу я решил использовать ThreadPool с QueueUserWorkItem ().

Как я могу постоянно ставить рабочие элементы в очередь, не ставя их в очередь сразу? Мне нужно поставить в очередь> 300 тыс. Элементов (по одному на каждый идентификатор пользователя), и если я зациклюсь, чтобы поставить их в очередь, мне не хватит памяти.

Итак, что бы я хотел:

// 1 = startUserID, 300000 = endUserID, 25 = MaxThreads  
Scraper webScraper = new Scraper(1, 300000, 25); 

webScraper.Start();  
// return immediately while webScraper runs in the background

В течение этого времени webScraper непрерывно добавляет все 300000 рабочих элементов по мере появления потоков.

Вот что у меня есть:

public class Scraper
    {
        private int MaxUserID { get; set; }
        private int MaxThreads { get; set; }
        private static int CurrentUserID { get; set; }
        private bool Running { get; set; }
        private Parser StatsParser = new Parser();


        public Scraper()
            : this(0, Int32.MaxValue, 25)
        {
        }

        public Scraper(int CurrentUserID, int MaxUserID, int MaxThreads)
        {
            this.CurrentUserID = CurrentUserID;
            this.MaxUserID = MaxUserID;
            this.MaxThreads = MaxThreads;
            this.Running = false;

            ThreadPool.SetMaxThreads(MaxThreads, MaxThreads);
        }

        public void Start()
        {
            int availableThreads;

            // Need to start a new thread to spawn the new WorkItems so Start() will return right away?
            while (Running)
            {

                // if (!CurrentUserID >= MaxUserID)
                // {
                //     while (availableThreads > 0)
                //     {
                //         ThreadPool.QueueUserWorkItem(new WaitCallBack(Process));
                //     }
                // }
                // else
                // { Running = false; }
            }
        }

        public void Stop()
        {
            Running = false;
        }

        public static void process(object state)
        {
             var userID = Interlocked.Increment(ref CurrentUserID);
             ... Fetch Stats for userID
        }
    }

Это правильный подход?

Может ли кто-нибудь указать мне правильное направление для обработки создания моих рабочих элементов в фоновом режиме после вызова Start (), а не для создания всех рабочих элементов одновременно?

Ответы [ 5 ]

2 голосов
/ 10 сентября 2009

Будет ли это лучше реализовано с меньшим количеством рабочих элементов, которые крадут работу из очереди работы? Если у вас есть 300 000 единиц работы, это не значит, что вам нужно 300 000 рабочих. Очевидно, что поскольку у вас есть только несколько ядер, только несколько из этих частей работы могут выполняться параллельно, так почему бы не раздать куски работы гораздо меньшему числу работников?

В зависимости от того, насколько постоянным является время, затрачиваемое на каждую часть работы, вы можете либо равномерно распределить его по каждому работнику, либо иметь центральную очередь (которую вам придется заблокировать), и каждый работник может получить некоторую работу как оно кончается.

РЕДАКТИРОВАТЬ:

Джо Даффи, кажется, готовит серию статей о написании Очереди за кражу работы: http://www.bluebytesoftware.com/blog/2008/08/12/BuildingACustomThreadPoolSeriesPart2AWorkStealingQueue.aspx. Похоже, Threadpool в .Net 4 будет немного умнее. Но я не думаю, что вам нужно что-то особенно сложное для этого сценария.

0 голосов
/ 28 октября 2009

Вы можете использовать другой пул потоков. Вот один из них: http://www.codeplex.com/smartthreadpool Это позволяет вам ставить в очередь все свои вещи одновременно. Вы можете назначить максимальное количество потоков для создания. Скажем, у вас есть 1000 рабочих элементов, и вы назначаете 100 потоков. Он сразу же возьмет первые 100 предметов и запустит их, пока остальные ждут. Как только один из этих элементов завершен и поток освобождается, запускается следующий элемент в очереди. Он управляет всей работой, но не насыщает потоки и память. Кроме того, он не использует потоки из пула потоков .net.

0 голосов
/ 27 октября 2009

Похоже, вам нужен мастер-класс управления процессом, который определяет количество увольняемых рабочих и поддерживает заполнение очереди.

Тогда вы можете работать с двумя очередями:

  1. Один, чтобы держать все предметы, которые нужно очистить
  2. Второй, чтобы сделать работу

Этот объект Master / Governor будет продолжать цикл до тех пор, пока все ваши элементы из очереди # 1 не исчезнут, и он будет продолжать добавляться в очередь № 2, когда у вас есть доступные циклы.

0 голосов
/ 26 октября 2009

Я определенно не буду использовать ThreadPool.SetMaxThreads - помните, что пул потоков распределяется между всеми процессами - установка максимального количества потоков просто снизит производительность. Идея пула потоков заключается в том, что вам не нужно указывать такие вещи, как максимальное количество потоков - платформа .Net вычисляет оптимальное количество потоков для распределения - вам не нужно это делать.

Обратите внимание, что постановка в очередь 300 000 элементов не приведет к появлению 300 000 потоков - класс ThreadPool будет управлять количеством потоков для вас и повторно использовать потоки по мере необходимости. Если вы просто обеспокоены тем, что таким образом будет израсходовано слишком много ресурсов, я бы порекомендовал вам уточнить ваш процесс - возможно, создать класс Spawner, который, в свою очередь, запускает 1000 экземпляров скребка?

0 голосов
/ 26 октября 2009

Я думаю, что создание очереди из поставленных в очередь элементов как-то не совсем правильно, так как насчет того, чтобы заставить WorkItems снова ставить себя в очередь после того, как они закончили?

Ваш метод Start может поставить в очередь, скажем, 3 раза элементы MaxThreads (75 в вашем примере), а затем ваш метод Process будет поставлен в очередь, когда он будет завершен. Таким образом, ваш метод Start быстро возвращается, но запускает несколько рабочих элементов, которые, как я уже сказал, запускаются сами:


    public class Scraper
    {
        private int MaxUserID { get; set; }
        private int MaxThreads { get; set; }
        private int currentUserID;
        private bool Running { get; set; }
        private Parser StatsParser = new Parser();

        private int Multiplier { get; set; }

        public Scraper()
            : this(0, Int32.MaxValue, 25)
        {
        }

        public Scraper(int currentUserID, int maxUserID, int maxThreads)
        {
            this.currentUserID = currentUserID;
            this.MaxUserID = maxUserID;
            this.MaxThreads = maxThreads;
            this.Running = false;

            ThreadPool.SetMaxThreads(maxThreads, maxThreads);
            Multiplier = 3;
        }

        public void Start()
        {
            Running = true;
            for (int i = 0; i < MaxThreads * Multiplier; i++)
            {
                ThreadPool.QueueUserWorkItem(Process);
            }
        }

        public void Stop()
        {
            Running = false;
        }

        public void Process(object state)
        {
            if (Running == false)
            {
                return;
            }
            if (currentUserID < MaxUserID)
            {
                Interlocked.Increment(ref currentUserID);
                //Parse stats for currentUserID
                ThreadPool.QueueUserWorkItem(Process);
            }
            else
            { Running = false; }
        }
    }

Я уверен, что флаг безопасности должен быть установлен с использованием Interlocked для безопасности. Я сделал множитель в свойстве, которое можно было передать конструктору - я вполне уверен, что его можно откорректировать, чтобы настроить производительность в зависимости от того, сколько времени потребуется для анализа этой статистики.

...