У меня есть многопоточная программа openCV, которая использует 4 потока для выполнения следующих действий:
Поток 1-> вызовы cvQueryFrame()
, который захватывает изображения кадра с камеры по одному и сохраняет их в std::vector
inputBuffer
Поток 2-> выполняет пороговое значение для inputBuffer[0]
, копирует результат в другой std::vector
с именем filterOutputBuffer
Поток 3-> выполняет алгоритм оптического потока / рисует поле потокадля первых двух элементов в filterOutputBuffer копирует результат в другой std::vector
с именем ofOutputBuffer
Thread 4-> отображает изображение, используя cvShowImage(ofOutputBuffer[0])
Так что, по сути, я предполагал, что каждый поток выполняетзадание на первый элемент соответствующего входного вектора / буфера и сохранение результата на обратной стороне соответствующего выходного вектора.Вроде как 3 фабричных рабочих делают свою часть на конвейере, а затем бросают конечный результат в ведро для следующего парня.
Я устанавливаю взаимные исключения для всех буферов, и программа работает, только выводзадерживается на несколько секунд из потока живой камеры.
Я запустил не многопоточную версию той же программы (в которой использовался один гигантский цикл while (true)), и она работала в режиме реального времени только со случайным заиканием.
Почему моя параллельная реализация так сильно задерживается в производительности?
Ниже приведены функции потоков:
void writeBuffer()
{
cout << "Thread " << GetCurrentThreadId() << ": Capturing frame from camera!" << endl;
CvCapture *capture = 0;
IplImage *frame = 0;
DWORD waitResult;
if (!(capture = cvCaptureFromCAM(0)))
cout << "Cannot initialize camera!" << endl;
//now start grabbing frames and storing into the vector inputBuffer
while (true)
{
//cout << "Thread " << GetCurrentThreadId() << ": Waiting for mutex to write to input buffer!..." << endl;
waitResult = WaitForSingleObject(hMutex, INFINITE);
switch(waitResult)
{
// The thread got ownership of the mutex
case WAIT_OBJECT_0:
frame = cvQueryFrame(capture); //store the image into frame
if(!frame)
{
cout << "Thread " << GetCurrentThreadId() << ": Error capturing frame from camera!" << endl;
}
//cout << "Thread " << GetCurrentThreadId() << ": Getting Frame..." << endl;
inputBuffer.push_back(*frame);
break;
default:
cout << "Thread " << GetCurrentThreadId() << ": Error acquiring mutex..." << endl;
}
if(!ReleaseMutex(hMutex))
{
cout << "Thread " << GetCurrentThreadId() << ": Error releasing mutex..." << endl;
}
//else cout << "Thread " << GetCurrentThreadId() << ": Done writing to input buffer, Mutex Released!" << endl;
//signal hDoneGettingFrame
PulseEvent(hDoneGettingFrame);
}
cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl;
}
void opticalFlow()
{
...
DWORD waitResult;
//start grabbing frames from the vector inputBuffer
cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from input buffer..." << endl;
while(true)
{
waitResult = WaitForSingleObject(fMutex, INFINITE);
switch(waitResult)
{
// The thread got ownership of the mutex
case WAIT_OBJECT_0:
//grab first two frames from buffer (inputBuffer[0-1]) and process them
if(filterOutputBuffer.size() > 1)
{
frame1 = filterOutputBuffer[0];
frame2 = filterOutputBuffer[1];
filterOutputBuffer.erase(filterOutputBuffer.begin());
}
else
{
if(!ReleaseMutex(fMutex))
cout << "Thread " << GetCurrentThreadId() << ": Error releasing filter mutex..." << endl;
//else cout << "Thread " << GetCurrentThreadId() << ": Input Buffer empty!" << endl;
continue;
}
break;
default:
cout << "Thread " << GetCurrentThreadId() << ": Error acquiring input mutex..." << endl;
continue;
}
if(!ReleaseMutex(fMutex))
{
cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
}
...
//Do optical flow stuff
...
waitResult = WaitForSingleObject(oMutex, INFINITE);
switch(waitResult)
{
// The thread got ownership of the mutex
case WAIT_OBJECT_0:
//cout << "Thread " << GetCurrentThreadId() << ": WRITING TO OUTPUT BUFFER..." << endl;
ofOutputBuffer.push_back(*frame1_3C);
break;
default:
cout << "Thread " << GetCurrentThreadId() << ": Error acquiring output mutex..." << endl;
}
if(!ReleaseMutex(oMutex))
cout << "Thread " << GetCurrentThreadId() << ": Error releasing output mutex..." << endl;
}
cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl;
}
void filterImage()
{
DWORD waitResult;
...
//start grabbing frames from the vector inputBuffer
cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from input buffer..." << endl;
while(true)
{
waitResult = WaitForSingleObject(hMutex, INFINITE);
switch(waitResult)
{
// The thread got ownership of the mutex
case WAIT_OBJECT_0:
//grab first frame and then release mutex
if(inputBuffer.size() > 0)
{
frame = inputBuffer[0];
inputBuffer.erase(inputBuffer.begin());
}
else
{
if(!ReleaseMutex(hMutex))
cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
//else cout << "Thread " << GetCurrentThreadId() << ": Input Buffer empty!" << endl;
continue;
}
break;
default:
cout << "Thread " << GetCurrentThreadId() << ": Error acquiring input mutex..." << endl;
continue;
}
if(!ReleaseMutex(hMutex))
{
cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
}
...
//Tresholding Image Stuff
...
//cout << "Thread " << GetCurrentThreadId() << ": Waiting to write to output buffer..." << endl;
waitResult = WaitForSingleObject(fMutex, INFINITE);
switch(waitResult)
{
// The thread got ownership of the mutex
case WAIT_OBJECT_0:
//cout << "Thread " << GetCurrentThreadId() << ": WRITING TO OUTPUT BUFFER..." << endl;
filterOutputBuffer.push_back(*out);
break;
default:
cout << "Thread " << GetCurrentThreadId() << ": Error acquiring filter mutex..." << endl;
}
if(!ReleaseMutex(fMutex))
cout << "Thread " << GetCurrentThreadId() << ": Error releasing filter mutex..." << endl;
}
}
void displayImage()
{
DWORD waitResult;
IplImage final;
int c;
cvNamedWindow("Image", CV_WINDOW_AUTOSIZE);
//start grabbing frames from the vector ouputBuffer
cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from output buffer..." << endl;
while (true)
{
waitResult = WaitForSingleObject(oMutex, INFINITE);
switch(waitResult)
{
// The thread got ownership of the mutex
case WAIT_OBJECT_0:
if(ofOutputBuffer.size() > 0)
{
//cout << "Thread " << GetCurrentThreadId() << ": Reading output buffer..." << endl;
final = ofOutputBuffer[0];
ofOutputBuffer.erase(ofOutputBuffer.begin());
}
else
{
if(!ReleaseMutex(oMutex))
cout << "Thread " << GetCurrentThreadId() << ": Error releasing output mutex..." << endl;
//else cout << "Thread " << GetCurrentThreadId() << ": Output Buffer is empty!" << endl;
continue;
}
break;
default:
cout << "Thread " << GetCurrentThreadId() << ": Error acquiring output mutex..." << endl;
continue;
}
if(!ReleaseMutex(oMutex))
cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
//else cout << "Thread " << GetCurrentThreadId() << ": Done reading output buffer, mutex Released!" << endl;
//cout << "Thread " << GetCurrentThreadId() << ": Displaying Image..." << endl;
cvShowImage("Image", &final);
c = cvWaitKey(1);
}
cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl;
}
Вот основная функция:
void main()
{
hMutex = CreateMutex(NULL, FALSE, NULL);
oMutex = CreateMutex(NULL, FALSE, NULL);
fMutex = CreateMutex(NULL, FALSE, NULL);
hDoneGettingFrame = CreateEvent(NULL, TRUE, FALSE, NULL);
hDoneReadingFrame = CreateEvent(NULL, TRUE, FALSE, NULL);
TName[0]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)writeBuffer, NULL, 0, &ThreadID);
TName[1]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)filterImage, NULL, 0, &ThreadID);
TName[2]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)opticalFlow, NULL, 0, &ThreadID);
TName[3]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)displayImage, NULL, 0, &ThreadID);
WaitForMultipleObjects(4, TName, TRUE, INFINITE);
CloseHandle(TName);
}