Проблемы параллелизма с QThreads. Потоки, принимающие один и тот же сигнал, блокируют друг друга - PullRequest
4 голосов
/ 18 января 2011

Итак, я работаю над программой, которая обрабатывает видео в режиме реального времени, и у меня возникают проблемы с тем, что потоки "блокируют" друг друга.

Моя система настроена так:

         DataSourceThread
             /      \
            /        \
           /          \
     Receiver       Receiver
         /              \
        /                \ 
       /                  \
 Processor1            Processor2

(Все эти классы расширяют QThread.)

Таким образом, DataSourceThread извлекает кадры из видеопотока и передает сигнал, содержащий этот кадр, получателям. Тип подключения: Qt :: DirectConnection

Приемники в основном получают кадры, отправленные DataSourceThread, и если Процессор завершит обработку предыдущего кадра, он отправит Процессору сигнал, содержащий кадр. Тип подключения: Qt :: QueuedConnection. Если процессор не завершил обработку предыдущего кадра, он просто вернется без выдачи сигнала (пропуск кадров).

Чтобы проверить, работает ли это, все, что я сделал, это заставил Processor1 просто распечатать сообщение, когда он получил кадр, а Processor2 сделал QThread :: sleep (3); и распечатать сообщение.

(Получатели также сделают глубокую копию кадра, прежде чем передать его процессорам.)

Ожидаемый результат:

Процессор1 должен постоянно печатать сообщения. Processor2 должен печатать сообщение каждые 3 секунды.

Проблема:

Оба процессора печатают свои сообщения одновременно (каждые 3 секунды). Processor1 ждет, пока Processor2 завершит свою работу, прежде чем печатать свое сообщение. Таким образом, вывод выглядит примерно так:

"Message from processor1"
"Message from processor2"
"Message from processor1"
"Message from processor2"
"Message from processor1"
"Message from processor2"

и т. Д.

У меня заканчиваются идеи здесь, поэтому любая помощь будет принята с благодарностью!

EDIT: Вот часть кода:

main.cpp:

int main(int argc, char *argv[])
{
    QApplication app(argc, argv);

    DataSourceThread dataSourceThread;
    dataSourceThread.start();

    GUIThread *guiProcessor = new GUIThread();
    FrameReceiver *guiReceiver = new FrameReceiver(guiProcessor, 0);

    QObject::connect(
        &dataSourceThread, SIGNAL(frameReceived(Frame*)),
        guiReceiver, SLOT(receive(Frame*)),
        Qt::DirectConnection
    );

    DetectorThread *detectorProcessor = new DetectorThread();
    FrameReceiver *detectorReceiver = new FrameReceiver(detectorProcessor, 0);

    QObject::connect(
        &dataSourceThread, SIGNAL(frameReceived(Frame*)),
        detectorReceiver, SLOT(receive(Frame*)),
        Qt::DirectConnection
    );

    return app.exec();
}  

Из DataSourceThread.cpp:

void DataSourceThread::run()
{
    ... stuff ...

    while (true) {
        image = cvQueryFrame(capture);

        if (!image) { 
            qDebug() << QString("Could not capture frame"); 
            continue;
        }

        cvReleaseImage(&temp_image);
        temp_image = cvCreateImage(cvSize(640, 480), image->depth, 3);

        cvResize(image, temp_image, 1);

        frame->lock();
        frame->setImage(temp_image);
        frame->unlock();

        emit frameReceived(frame);

        msleep(1); 
    }
} 

FrameReceiver.cpp:

FrameReceiver::FrameReceiver(FrameProcessor* processor, QObject *parent) : QThread(parent) {
    m_ready = true;

    m_processor = processor;
    m_processor->start();

    QObject::connect(
        (QObject*)this, SIGNAL(frameReceived(Frame*)),
        m_processor, SLOT(receive(Frame*)), 
        Qt::QueuedConnection
    );

    QObject::connect(
        m_processor, SIGNAL(ready()),
        (QObject*)this, SLOT(processCompleted()),
        Qt::DirectConnection
    ); }

void FrameReceiver::processCompleted() {
    m_ready = true; }

void FrameReceiver::receive(Frame *frame) {
    if (m_ready == true) {
        m_ready = false;
        frame->lock();
        Frame *f = new Frame(*frame);
        frame->unlock();
        emit frameReceived(f);
    } else {
        // SKIPPED THIS FRAME
    }
}

GUIThread.cpp: (Процессор1)

GUIThread::GUIThread(QObject *parent) : FrameProcessor(parent)
{
    m_frame = new Frame();
}

void GUIThread::setFrame(Frame *frame)
{ 
    qDebug() << QString("Guithread received frame");
}    

FrameProcessor.cpp

// (The processors extend this class)
void FrameProcessor::receive(Frame *frame)
 {
     setFrame(frame);
     delete frame;
     emit ready();
 }

DetectorThread (Processor2) делает то же самое, что и guithread, но с 3-секундным сном в setFrame.

1 Ответ

3 голосов
/ 19 января 2011

Я думаю, что отчасти проблема в том, что все ваши объекты QObject принадлежат основному потоку приложения. Это означает, что все они совместно используют один цикл обработки событий для доставки асинхронных сигналов, эффективно сериализуя всю цепочку обработки.

Я думаю, что правильный способ настроить это будет примерно так:

GUIProcessor *guiProcessor = new GUIProcessor();
QThread guiProcessorThread;
guiProcessor.moveToThread(&guiProcessorThread);

FrameReceiver *guiReceiver = new FrameReceiver(guiProcessor, 0);
QThread guiReceiverThread;
guiReceiver.moveToThread(&guiReceiverThread);

guiProcessorThread.start();
guiReceiverThread.start();

Если вы сделаете это таким образом, я бы предложил не использовать DirectConnection между потоками, а BlockingQueuedConnection, если вы хотите, чтобы текущий кадр обрабатывался до захвата следующего.

См. Это: http://labs.qt.nokia.com/2010/06/17/youre-doing-it-wrong/

А это: http://labs.qt.nokia.com/2006/12/04/threading-without-the-headache/

Надеюсь, это поможет!

РЕДАКТИРОВАТЬ: Для ясности, с моим предложением ваши классы будут наследовать QObject вместо QThread.

...