Простой пример Windows Thread Pool - PullRequest
5 голосов
/ 02 декабря 2011

[РЕДАКТИРОВАТЬ: благодаря ответу MSalters и ответу Раймонда Чена на InterlockedIncrement против EnterCriticalSection / counter ++ / LeaveCriticalSection , проблема решена, и приведенный ниже код работает правильно. Это должно привести к интересному простому примеру использования Thread Pool в Windows]

Мне не удается найти простой пример следующей задачи. Моя программа, например, должна увеличивать значения в огромном std :: vector на единицу, поэтому я хочу сделать это параллельно. Это необходимо сделать несколько раз за время существования программы. Я знаю, как это сделать, используя CreateThread при каждом вызове подпрограммы, но мне не удается избавиться от CreateThread с помощью ThreadPool.

Вот что я делаю:

class Thread {
public:
    Thread(){}
    virtual void run() = 0 ; // I can inherit an "IncrementVectorThread"
};
class IncrementVectorThread: public Thread {
public:
   IncrementVectorThread(int threadID, int nbThreads, std::vector<int> &vec) : id(threadID), nb(nbThreads), myvec(vec) { };

   virtual void run() {
        for (int i=(myvec.size()*id)/nb; i<(myvec.size()*(id+1))/nb; i++)
          myvec[i]++; //and let's assume myvec is properly sized
    }
   int id, nb;
   std::vector<int> &myvec;
};

class ThreadGroup : public std::vector<Thread*> {
public:
    ThreadGroup() { 
         pool = CreateThreadpool(NULL);
         InitializeThreadpoolEnvironment(&cbe);
         cleanupGroup = CreateThreadpoolCleanupGroup();
         SetThreadpoolCallbackPool(&cbe, pool);
         SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL);
         threadCount = 0;
    }
    ~ThreadGroup() {
         CloseThreadpool(pool);
}
    PTP_POOL pool;
    TP_CALLBACK_ENVIRON cbe;
    PTP_CLEANUP_GROUP cleanupGroup;
    volatile long threadCount;
} ;


static VOID CALLBACK runFunc(
                PTP_CALLBACK_INSTANCE Instance,
                PVOID Context,
                PTP_WORK Work) {

   ThreadGroup &thread = *((ThreadGroup*) Context);
   long id = InterlockedIncrement(&(thread.threadCount));
   DWORD tid = (id-1)%thread.size();
   thread[tid]->run();
}

void run_threads(ThreadGroup* thread_group) {
    SetThreadpoolThreadMaximum(thread_group->pool, thread_group->size());
    SetThreadpoolThreadMinimum(thread_group->pool, thread_group->size());

    TP_WORK *worker = CreateThreadpoolWork(runFunc, (void*) thread_group, &thread_group->cbe);
    thread_group->threadCount = 0;
    for (int i=0; i<thread_group->size(); i++) {
        SubmitThreadpoolWork(worker);
     }  
     WaitForThreadpoolWorkCallbacks(worker,FALSE);  
     CloseThreadpoolWork(worker);   
}       

void main() {

   ThreadGroup group;
   std::vector<int> vec(10000, 0);
   for (int i=0; i<10; i++)
      group.push_back(new IncrementVectorThread(i, 10, vec));

   run_threads(&group);
   run_threads(&group);
   run_threads(&group);

   // now, vec should be == std::vector<int>(10000, 3);       
}

Итак, если я правильно понял:
- команда CreateThreadpool создает группу потоков (следовательно, вызов CreateThreadpoolWork является дешевым, поскольку он не вызывает CreateThread)
- У меня может быть столько пулов потоков, сколько я хочу (если я хочу создать пул потоков для «IncrementVector» и один для моих потоков «DecrementVector», я могу).
- если мне нужно разделить мою задачу «вектора приращения» на 10 потоков, вместо 10-кратного вызова CreateThread, я создаю один «рабочий» и отправляю его 10 раз в ThreadPool с тем же параметром (следовательно, мне нужен поток Идентификатор в обратном вызове, чтобы узнать, какую часть моего std :: vector увеличить). Здесь я не смог найти идентификатор потока, так как функция GetCurrentThreadId () возвращает реальный идентификатор потока (то есть, что-то вроде 1528, а не что-то между 0..nb_launched_threads).

Наконец, я не уверен, что хорошо понял концепцию: действительно ли мне нужен один рабочий, а не 10, если я разделю свой std :: vector на 10 потоков?

Спасибо!

1 Ответ

6 голосов
/ 02 декабря 2011

Вы примерно до последней точки.

Вся идея пула потоков в том, что вам все равно, сколько у него потоков.Вы просто добавляете много работы в пул потоков и позволяете ОС определять, как выполнять каждый фрагмент.Таким образом, если вы создаете и отправляете 10 чанков, ОС может использовать от 1 до 10 потоков из пула.

Вам не нужно заботиться об этих идентификаторах потоков.Не связывайтесь с идентификаторами потоков, минимальным или максимальным количеством потоков и тому подобным.

Если вас не интересуют идентификаторы потоков, как вы можете управлять тем, какую часть вектора изменить?Просто.Перед созданием пула потоков инициализируйте счетчик на ноль.В функции обратного вызова вызовите InterlockedIncrement, чтобы получить и увеличить счетчик.За каждый отправленный рабочий элемент вы получите последовательное целое число.

...