QtConcurrent: почему releaseThread и reserveThread вызывают взаимоблокировку? - PullRequest
0 голосов
/ 13 декабря 2018

В Qt 4.7 Reference для QThreadPool мы находим:

void QThreadPool::releaseThread()

Освобождает поток, ранее зарезервированный вызовом для reserveThread().

Примечание. Вызов этой функции без предварительного резервирования потока временно увеличивает maxThreadCount(). Это полезно, когда поток переходит в режим ожидания, ожидая дополнительной работы, что позволяет другим потокам продолжить работу.Обязательно вызовите reserveThread(), когда закончите ожидание, чтобы пул потоков мог правильно поддерживать activeThreadCount().

См. Также reserveThread().


void QThreadPool::reserveThread()

Резервирует один поток, игнорируя activeThreadCount() и maxThreadCount().

Как только вы закончите с потоком, позвоните releaseThread(), чтобы разрешить его повторное использование.

Примечание: эта функция всегда увеличивает количество активных потоков.Это означает, что с помощью этой функции можно activeThreadCount() вернуть значение больше maxThreadCount().

См. Также releaseThread().

Я хочу использоватьreleaseThread(), чтобы сделать возможным использование вложенной параллельной карты, но в следующем коде она висит в waitForFinished():

#include <QApplication>
#include <QMainWindow>
#include <QtConcurrentMap>
#include <QtConcurrentRun>
#include <QFuture>
#include <QThreadPool>
#include <QtTest/QTest>
#include <QFutureSynchronizer>

struct Task2 { // only calculation
    typedef void result_type;
    void operator()(int count) {
        int k = 0;
        for (int i = 0; i < count * 10; ++i) {
            for (int j = 0; j < count * 10; ++j) {
                k++;
            }
        }
        assert(k >= 0);
    }
};

struct Task1 { // will launch some other concurrent map
    typedef void result_type;
    void operator()(int count) {

        QVector<int> vec;
        for (int i = 0; i < 5; ++i) {
            vec.push_back(i+count);
        }
        Task2 task;

        QFuture<void> f = QtConcurrent::map(vec.begin(), vec.end(), task);
        {
            // with out releaseThread before wait, it will hang directly
            QThreadPool::globalInstance()->releaseThread();
            f.waitForFinished(); // BUG: may hang there
            QThreadPool::globalInstance()->reserveThread();
        }
    }
};


int main() {
    QThreadPool* gtpool = QThreadPool::globalInstance();
    gtpool->setExpiryTimeout(50);
    int count = 0;
    for (;;) {
        QVector<int> vec;
        for (int i = 0; i < 40 ; i++) {
            vec.push_back(i);
        }
        // launch a task with nested map
        Task1 task; // Task1 will have nested concurrent map
        QFuture<void> f = QtConcurrent::map(vec.begin(), vec.end(),task);

        f.waitForFinished(); // BUG: may hang there

        count++;

        // waiting most of thread in thread pool expire
        while (QThreadPool::globalInstance()->activeThreadCount() > 0) {
            QTest::qSleep(50);
        }

        // launch a task only calculation
        Task2 task2;
        QFuture<void> f2 = QtConcurrent::map(vec.begin(), vec.end(), task2);

        f2.waitForFinished(); // BUG: may hang there

        qDebug() << count;
    }
    return 0;
}

Этот код не будет работать вечно;он будет зависать через много циклов (1 ~ 10000), все потоки ожидают условную переменную.

Мои вопросы:

  1. Почему зависает?
  2. Могу ли я исправить это и сохранить вложенную параллельную карту?

dev env:

Linux version 2.6.32-696.18.7.el6.x86_64;Qt4.7.4;GCC 3.4.5

Windows 7;Qt4.7.4;Mingw 4.4.0

Ответы [ 2 ]

0 голосов
/ 24 декабря 2018

@ tungIt ответ достаточно хороший, я нашел коммит qtbug и fix, просто для справки:

https://bugreports.qt.io/browse/QTBUG-3786

https://github.com/qt/qtbase/commit/a9b6a78e54670a70b96c122b10ad7bd64d166514#diff-6d5794cef91df41c39b5e7cc6b71d041

0 голосов
/ 13 декабря 2018

Программа зависает из-за состояния гонки в QThreadPool при попытке справиться с expiryTimeout.Вот подробный анализ:

Проблема в QThreadPool - source

При запуске задачи QThreadPool что-то делал построки:

QMutexLocker locker(&mutex);

taskQueue.append(task); // Place the task on the task queue
if (waitingThreads > 0) {
   // there are already running idle thread. They are waiting on the 'runnableReady' 
   // QWaitCondition. Wake one up them up.
   waitingThreads--;
   runnableReady.wakeOne();
} else if (runningThreadCount < maxThreadCount) {
   startNewThread(task);
}

И основной цикл потока выглядит так:

void QThreadPoolThread::run()
{
  QMutexLocker locker(&manager->mutex);
  while (true) {
    /* ... */
    if (manager->taskQueue.isEmpty()) {
      // no pending task, wait for one.
      bool expired = !manager->runnableReady.wait(locker.mutex(), 
                                                  manager->expiryTimeout);
      if (expired) {
        manager->runningThreadCount--;
        return;
      } else {
        continue;
      }
    }
    QRunnable *r = manager->taskQueue.takeFirst();
    // run the task
    locker.unlock();
    r->run();
    locker.relock();
  }
}

Идея состоит в том, что поток будетподождать заданное количество секунд для задачи, но если за заданный промежуток времени задача не была добавлена, поток истекает и завершается.Проблема в том, что мы полагаемся на возвращаемое значение runnableReady. Если есть задача, которая запланирована точно в то же время, когда истекает поток, то поток увидит false и истечет.Но основной поток не будет перезапускать любой другой поток.Это может привести к зависанию приложения, поскольку задача никогда не будет запущена.

Быстрый обходной путь - использовать длинный expiryTime (по умолчанию 30000) и удалить цикл while, который ожидает потоки.expired.

Вот модифицированная основная функция, программа гладко работает в Windows 7, по умолчанию используются 4 потока:

int main() {
    QThreadPool* gtpool = QThreadPool::globalInstance();
    //gtpool->setExpiryTimeout(50); <-- don't set the expiry Timeout, use the default one.
    qDebug() << gtpool->maxThreadCount();

    int count = 0;
    for (;;) {

        QVector<int> vec;
        for (int i = 0; i < 40 ; i++) {
            vec.push_back(i);
        }
        // launch a task with nested map
        Task1 task; // Task1 will have nested concurrent map
        QFuture<void> f = QtConcurrent::map(vec.begin(), vec.end(),task);

        f.waitForFinished(); // BUG: may hang there

        count++;

        /*
        // waiting most of thread in thread pool expire
        while (QThreadPool::globalInstance()->activeThreadCount() > 0)
        {
            QTest::qSleep(50);
        }
        */

        // launch a task only calculation
        Task2 task2;
        QFuture<void> f2 = QtConcurrent::map(vec.begin(), vec.end(), task2);

        f2.waitForFinished(); // BUG: may hang there

        qDebug() << count ;
    }
    return 0;
}
...