У меня есть собственный класс пула потоков, который создает несколько потоков, каждый из которых ожидает своего события (сигнала). Когда новое задание добавляется в пул потоков, оно пробуждает первый свободный поток, чтобы выполнить задание.
Проблема заключается в следующем: у меня есть около 1000 циклов в каждой из 10 000 итераций. Эти циклы должны выполняться последовательно, но у меня есть 4 доступных процессора. Я пытаюсь разделить 10 000 циклов итераций на 4 2 500 циклов итераций, то есть по одному на поток. Но мне нужно дождаться окончания 4 маленьких циклов, прежде чем перейти к следующей «большой» итерации. Это означает, что я не могу связать задания.
Моя проблема заключается в том, что использование пула потоков и 4 потоков намного медленнее, чем последовательное выполнение заданий (выполнение одного цикла отдельным потоком намного медленнее, чем последовательное выполнение его непосредственно в основном потоке).
Я в Windows, поэтому я создаю события с CreateEvent()
, а затем жду на одном из них, используя WaitForMultipleObjects(2, handles, false, INFINITE)
, пока основной поток не вызовет SetEvent()
.
Похоже, что все это событие (наряду с синхронизацией потоков с использованием критических секций) довольно дорого!
Мой вопрос: нормально ли, что использование событий отнимает «много» времени? Если так, есть ли другой механизм, который я мог бы использовать, и который был бы менее дорогостоящим?
Вот некоторый код для иллюстрации (некоторые соответствующие части скопированы из моего класса пула потоков):
// thread function
unsigned __stdcall ThreadPool::threadFunction(void* params) {
// some housekeeping
HANDLE signals[2];
signals[0] = waitSignal;
signals[1] = endSignal;
do {
// wait for one of the signals
waitResult = WaitForMultipleObjects(2, signals, false, INFINITE);
// try to get the next job parameters;
if (tp->getNextJob(threadId, data)) {
// execute job
void* output = jobFunc(data.params);
// tell thread pool that we're done and collect output
tp->collectOutput(data.ID, output);
}
tp->threadDone(threadId);
}
while (waitResult - WAIT_OBJECT_0 == 0);
// if we reach this point, endSignal was sent, so we are done !
return 0;
}
// create all threads
for (int i = 0; i < nbThreads; ++i) {
threadData data;
unsigned int threadId = 0;
char eventName[20];
sprintf_s(eventName, 20, "WaitSignal_%d", i);
data.handle = (HANDLE) _beginthreadex(NULL, 0, ThreadPool::threadFunction,
this, CREATE_SUSPENDED, &threadId);
data.threadId = threadId;
data.busy = false;
data.waitSignal = CreateEvent(NULL, true, false, eventName);
this->threads[threadId] = data;
// start thread
ResumeThread(data.handle);
}
// add job
void ThreadPool::addJob(int jobId, void* params) {
// housekeeping
EnterCriticalSection(&(this->mutex));
// first, insert parameters in the list
this->jobs.push_back(job);
// then, find the first free thread and wake it
for (it = this->threads.begin(); it != this->threads.end(); ++it) {
thread = (threadData) it->second;
if (!thread.busy) {
this->threads[thread.threadId].busy = true;
++(this->nbActiveThreads);
// wake thread such that it gets the next params and runs them
SetEvent(thread.waitSignal);
break;
}
}
LeaveCriticalSection(&(this->mutex));
}