Многопоточность OpenCV (Windows / .NET) задерживает несколько секунд с захвата видео - PullRequest
0 голосов
/ 26 февраля 2012

У меня есть многопоточная программа openCV, которая использует 4 потока для выполнения следующих действий:

Поток 1-> вызовы cvQueryFrame(), который захватывает изображения кадра с камеры по одному и сохраняет их в std::vector inputBuffer

Поток 2-> выполняет пороговое значение для inputBuffer[0], копирует результат в другой std::vector с именем filterOutputBuffer

Поток 3-> выполняет алгоритм оптического потока / рисует поле потокадля первых двух элементов в filterOutputBuffer копирует результат в другой std::vector с именем ofOutputBuffer

Thread 4-> отображает изображение, используя cvShowImage(ofOutputBuffer[0])

Так что, по сути, я предполагал, что каждый поток выполняетзадание на первый элемент соответствующего входного вектора / буфера и сохранение результата на обратной стороне соответствующего выходного вектора.Вроде как 3 фабричных рабочих делают свою часть на конвейере, а затем бросают конечный результат в ведро для следующего парня.

Я устанавливаю взаимные исключения для всех буферов, и программа работает, только выводзадерживается на несколько секунд из потока живой камеры.

Я запустил не многопоточную версию той же программы (в которой использовался один гигантский цикл while (true)), и она работала в режиме реального времени только со случайным заиканием.

Почему моя параллельная реализация так сильно задерживается в производительности?

Ниже приведены функции потоков:

    void writeBuffer()
    {
        cout << "Thread " << GetCurrentThreadId() << ": Capturing frame from camera!" << endl;
        CvCapture *capture = 0;
        IplImage *frame = 0;
        DWORD waitResult;

        if (!(capture = cvCaptureFromCAM(0)))
            cout << "Cannot initialize camera!" << endl;

        //now start grabbing frames and storing into the vector inputBuffer
        while (true)
        {
            //cout << "Thread " << GetCurrentThreadId() << ": Waiting for mutex to write to input buffer!..." << endl;
            waitResult = WaitForSingleObject(hMutex, INFINITE);
            switch(waitResult) 
            {
                // The thread got ownership of the mutex
                case WAIT_OBJECT_0:
                    frame = cvQueryFrame(capture); //store the image into frame
                    if(!frame)
                    {
                        cout << "Thread " << GetCurrentThreadId() << ": Error capturing frame from camera!" << endl;
                    }
                    //cout << "Thread " << GetCurrentThreadId() << ": Getting Frame..." << endl;
                    inputBuffer.push_back(*frame);
                break; 
                default:
                    cout << "Thread " << GetCurrentThreadId() << ": Error acquiring mutex..." << endl;
            }
            if(!ReleaseMutex(hMutex)) 
            { 
                cout << "Thread " << GetCurrentThreadId() << ": Error releasing mutex..." << endl;
            }
            //else cout << "Thread " << GetCurrentThreadId() << ": Done writing to input buffer, Mutex Released!" << endl;
            //signal hDoneGettingFrame
            PulseEvent(hDoneGettingFrame);
        }
            cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl;
    }


    void opticalFlow()
    {
    ...
        DWORD waitResult;

        //start grabbing frames from the vector inputBuffer
        cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from input buffer..." << endl;
        while(true)
        {
            waitResult = WaitForSingleObject(fMutex, INFINITE);
            switch(waitResult) 
            {
                // The thread got ownership of the mutex
                case WAIT_OBJECT_0: 
                    //grab first two frames from buffer (inputBuffer[0-1]) and process them
                    if(filterOutputBuffer.size() > 1)
                    {   
                        frame1 = filterOutputBuffer[0];
                        frame2 = filterOutputBuffer[1];
                        filterOutputBuffer.erase(filterOutputBuffer.begin());
                    }
                    else 
                    {
                        if(!ReleaseMutex(fMutex)) 
                            cout << "Thread " << GetCurrentThreadId() << ": Error releasing filter mutex..." << endl;
                        //else cout << "Thread " << GetCurrentThreadId() << ": Input Buffer empty!" << endl;
                        continue;
                    }
                break; 
                default:
                    cout << "Thread " << GetCurrentThreadId() << ": Error acquiring input mutex..." << endl;
                    continue;
            }
            if(!ReleaseMutex(fMutex)) 
            { 
                cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
            }
    ...
    //Do optical flow stuff
    ...
    waitResult = WaitForSingleObject(oMutex, INFINITE);
            switch(waitResult)
            {
                // The thread got ownership of the mutex
                case WAIT_OBJECT_0:
                    //cout << "Thread " << GetCurrentThreadId() << ": WRITING TO OUTPUT BUFFER..." << endl;
                    ofOutputBuffer.push_back(*frame1_3C);
                break;
                default:
                    cout << "Thread " << GetCurrentThreadId() << ": Error acquiring output mutex..." << endl;
            }
            if(!ReleaseMutex(oMutex)) 
                cout << "Thread " << GetCurrentThreadId() << ": Error releasing output mutex..." << endl;
    }
        cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl;
    }

    void filterImage()
{
    DWORD waitResult;
...

    //start grabbing frames from the vector inputBuffer
    cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from input buffer..." << endl;
    while(true)
    {
        waitResult = WaitForSingleObject(hMutex, INFINITE);
        switch(waitResult) 
        {
            // The thread got ownership of the mutex
            case WAIT_OBJECT_0: 
                //grab first frame and then release mutex
                if(inputBuffer.size() > 0)
                {   
                    frame = inputBuffer[0];
                    inputBuffer.erase(inputBuffer.begin());
                }
                else 
                {
                    if(!ReleaseMutex(hMutex)) 
                        cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
                    //else cout << "Thread " << GetCurrentThreadId() << ": Input Buffer empty!" << endl;
                    continue;
                }
            break; 
            default:
                cout << "Thread " << GetCurrentThreadId() << ": Error acquiring input mutex..." << endl;
                continue;
        }
        if(!ReleaseMutex(hMutex)) 
        { 
            cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
        }
...
//Tresholding Image Stuff
...
        //cout << "Thread " << GetCurrentThreadId() << ": Waiting to write to output buffer..." << endl;
        waitResult = WaitForSingleObject(fMutex, INFINITE);
        switch(waitResult)
        {
            // The thread got ownership of the mutex
            case WAIT_OBJECT_0:
                //cout << "Thread " << GetCurrentThreadId() << ": WRITING TO OUTPUT BUFFER..." << endl;
                filterOutputBuffer.push_back(*out);
            break;
            default:
                cout << "Thread " << GetCurrentThreadId() << ": Error acquiring filter mutex..." << endl;
        }
        if(!ReleaseMutex(fMutex)) 
            cout << "Thread " << GetCurrentThreadId() << ": Error releasing filter mutex..." << endl;

    }
}

void displayImage()
{
    DWORD waitResult;
    IplImage final;
    int c;
    cvNamedWindow("Image", CV_WINDOW_AUTOSIZE);
    //start grabbing frames from the vector ouputBuffer
    cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from output buffer..." << endl;
    while (true)
    {
            waitResult = WaitForSingleObject(oMutex, INFINITE);
            switch(waitResult) 
            {
                    // The thread got ownership of the mutex
                    case WAIT_OBJECT_0:
                        if(ofOutputBuffer.size() > 0)
                        {
                            //cout << "Thread " << GetCurrentThreadId() << ": Reading output buffer..." << endl;
                            final = ofOutputBuffer[0];
                            ofOutputBuffer.erase(ofOutputBuffer.begin());
                        }
                        else 
                        {
                            if(!ReleaseMutex(oMutex)) 
                                cout << "Thread " << GetCurrentThreadId() << ": Error releasing output mutex..." << endl;
                            //else cout << "Thread " << GetCurrentThreadId() << ": Output Buffer is empty!" << endl;
                            continue;
                        }
                    break;
                    default:
                        cout << "Thread " << GetCurrentThreadId() << ": Error acquiring output mutex..." << endl;
                        continue;
            }
            if(!ReleaseMutex(oMutex)) 
                cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
            //else cout << "Thread " << GetCurrentThreadId() << ": Done reading output buffer, mutex Released!" << endl;

            //cout << "Thread " << GetCurrentThreadId() << ": Displaying Image..." << endl;
            cvShowImage("Image", &final);
            c = cvWaitKey(1);
    }
    cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl;
}

Вот основная функция:

void main()
{
    hMutex = CreateMutex(NULL, FALSE, NULL);
    oMutex = CreateMutex(NULL, FALSE, NULL);
    fMutex = CreateMutex(NULL, FALSE, NULL);

    hDoneGettingFrame = CreateEvent(NULL, TRUE, FALSE, NULL);
    hDoneReadingFrame = CreateEvent(NULL, TRUE, FALSE, NULL);

    TName[0]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)writeBuffer, NULL, 0, &ThreadID);
    TName[1]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)filterImage, NULL, 0, &ThreadID);
    TName[2]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)opticalFlow, NULL, 0, &ThreadID);
    TName[3]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)displayImage, NULL, 0, &ThreadID);
    WaitForMultipleObjects(4, TName, TRUE, INFINITE);
    CloseHandle(TName);
}

Ответы [ 4 ]

0 голосов
/ 29 февраля 2012

Семафоры сделали свое дело! Вместо того, чтобы использовать отдельные мьютексы, я просто создал семафор и позволил всем потокам работать через него.

Спасибо, теперь он работает быстро и плавно!

void main()
{
    hSemaphore = CreateSemaphore( 
        NULL,           // default security attributes
        MAX_THREADS,  // available count (when a thread enters, it decreases)
        MAX_THREADS,  // maximum count
        NULL);          // unnamed semaphore

    if (hSemaphore == NULL) 
    {
        printf("CreateSemaphore error: %d\n", GetLastError());
        return;
    }


    TName[0]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)writeBuffer, NULL, 0, &ThreadID);
    TName[2]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)opticalFlow, NULL, 0, &ThreadID);
    TName[3]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)displayImage, NULL, 0, &ThreadID);
    WaitForMultipleObjects(4, TName, TRUE, INFINITE);
    CloseHandle(TName);
}   

А в темах ...

   //instead of separate Mutexes, just wait for semaphore
    waitResult = WaitForSingleObject(hSemaphore, INFINITE);
                switch(waitResult) 
                {

                            ...

                            }
                if(!ReleaseSemaphore(hSemaphore, 1, NULL)) 
                    cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
0 голосов
/ 26 февраля 2012

Попробуйте использовать threadPool, чтобы минимизировать время, затрачиваемое ЦП на перемещение между потоками.

0 голосов
/ 26 февраля 2012

Ну, для начала, если я немного разверну цикл вашей первой функции потока:

if(!ReleaseMutex(hMutex)){} 
PulseEvent(hDoneGettingFrame);
waitResult = WaitForSingleObject(hMutex, INFINITE);

Другими словами, ваш первый поток удерживает мьютекс очереди почти на протяжении всего цикла первого потока, поэтому второй поток не может никуда попасть. Я предполагаю, что код всех других потоков одинаков?

При перемещении данных по очередям производитель-потребитель в конвейере идея заключается в том, что вы должны держать очередь заблокированной только в течение минимального времени. Выполните обработку буфера объекта, затем заблокируйте очередь, нажмите на ссылку на объект и сразу же разблокируйте очередь. Затем подайте сигнал семафору (или тому подобное), чтобы следующий поток мог обработать объект, когда это возможно.

Не держите очередь заблокированной! Мьютекс не должен быть там, чтобы другой поток ожидал работы - он защищает очередь от множественного доступа. Вам нужны другие сигналы для поддержания количества очередей и потоков, ожидающих работы. Не используйте событие для этого, независимо от того, сколько других примеров вы видели в сети - используйте семафор, если вам нужно выкатить собственную очередь производитель-потребитель.

Лучше - используйте класс очереди P-C, который уже работает - посмотрите на классы BlockingCollections.

0 голосов
/ 26 февраля 2012

Многопоточное приложение делит время ЦП между потоками.Следовательно, существуют переключатели контекста, когда другой поток хотел находиться в состоянии выполнения.Возможно, переключение между потоками увеличивает время процессора, в результате чего приложение замедляется.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...