[РЕДАКТИРОВАТЬ: благодаря ответу MSalters и ответу Раймонда Чена на InterlockedIncrement против EnterCriticalSection / counter ++ / LeaveCriticalSection , проблема решена, и приведенный ниже код работает правильно. Это должно привести к интересному простому примеру использования Thread Pool в Windows]
Мне не удается найти простой пример следующей задачи. Моя программа, например, должна увеличивать значения в огромном std :: vector на единицу, поэтому я хочу сделать это параллельно. Это необходимо сделать несколько раз за время существования программы. Я знаю, как это сделать, используя CreateThread при каждом вызове подпрограммы, но мне не удается избавиться от CreateThread с помощью ThreadPool.
Вот что я делаю:
class Thread {
public:
Thread(){}
virtual void run() = 0 ; // I can inherit an "IncrementVectorThread"
};
class IncrementVectorThread: public Thread {
public:
IncrementVectorThread(int threadID, int nbThreads, std::vector<int> &vec) : id(threadID), nb(nbThreads), myvec(vec) { };
virtual void run() {
for (int i=(myvec.size()*id)/nb; i<(myvec.size()*(id+1))/nb; i++)
myvec[i]++; //and let's assume myvec is properly sized
}
int id, nb;
std::vector<int> &myvec;
};
class ThreadGroup : public std::vector<Thread*> {
public:
ThreadGroup() {
pool = CreateThreadpool(NULL);
InitializeThreadpoolEnvironment(&cbe);
cleanupGroup = CreateThreadpoolCleanupGroup();
SetThreadpoolCallbackPool(&cbe, pool);
SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL);
threadCount = 0;
}
~ThreadGroup() {
CloseThreadpool(pool);
}
PTP_POOL pool;
TP_CALLBACK_ENVIRON cbe;
PTP_CLEANUP_GROUP cleanupGroup;
volatile long threadCount;
} ;
static VOID CALLBACK runFunc(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context,
PTP_WORK Work) {
ThreadGroup &thread = *((ThreadGroup*) Context);
long id = InterlockedIncrement(&(thread.threadCount));
DWORD tid = (id-1)%thread.size();
thread[tid]->run();
}
void run_threads(ThreadGroup* thread_group) {
SetThreadpoolThreadMaximum(thread_group->pool, thread_group->size());
SetThreadpoolThreadMinimum(thread_group->pool, thread_group->size());
TP_WORK *worker = CreateThreadpoolWork(runFunc, (void*) thread_group, &thread_group->cbe);
thread_group->threadCount = 0;
for (int i=0; i<thread_group->size(); i++) {
SubmitThreadpoolWork(worker);
}
WaitForThreadpoolWorkCallbacks(worker,FALSE);
CloseThreadpoolWork(worker);
}
void main() {
ThreadGroup group;
std::vector<int> vec(10000, 0);
for (int i=0; i<10; i++)
group.push_back(new IncrementVectorThread(i, 10, vec));
run_threads(&group);
run_threads(&group);
run_threads(&group);
// now, vec should be == std::vector<int>(10000, 3);
}
Итак, если я правильно понял:
- команда CreateThreadpool создает группу потоков (следовательно, вызов CreateThreadpoolWork является дешевым, поскольку он не вызывает CreateThread)
- У меня может быть столько пулов потоков, сколько я хочу (если я хочу создать пул потоков для «IncrementVector» и один для моих потоков «DecrementVector», я могу).
- если мне нужно разделить мою задачу «вектора приращения» на 10 потоков, вместо 10-кратного вызова CreateThread, я создаю один «рабочий» и отправляю его 10 раз в ThreadPool с тем же параметром (следовательно, мне нужен поток Идентификатор в обратном вызове, чтобы узнать, какую часть моего std :: vector увеличить). Здесь я не смог найти идентификатор потока, так как функция GetCurrentThreadId () возвращает реальный идентификатор потока (то есть, что-то вроде 1528, а не что-то между 0..nb_launched_threads).
Наконец, я не уверен, что хорошо понял концепцию: действительно ли мне нужен один рабочий, а не 10, если я разделю свой std :: vector на 10 потоков?
Спасибо!