Выполнение потока Qt не останавливается после вызова метода exit () quit () или terminate () - PullRequest
2 голосов
/ 31 марта 2020

Прежде всего, я прочитал QThread s и использую QEventL oop, но я не совсем уверен, что моя реализация верна.

TL; DR см. Описание проблемы ниже.

Наиболее полезными источниками информации являются Qt Wiki , KDAB Qthread презентация (полезно для w / o без события l oop), SO сообщения здесь и здесь , относящиеся к этому вопросу.

Мой сценарий:

У меня есть потенциально очень долго работающая функция с несколькими вызовами на дисках ввода / вывода. Таким образом, мне нужен поток, чтобы не блокировать пользовательский интерфейс. Для этого я создал собственную реализацию Thread.

TL; DR QThreads

Насколько я понимаю, QThread - это отдельное событие. l oop объект и требует либо пользовательской реализации run () , либо перемещения объекта во вновь созданный поток object , в котором перемещенные объекты живут (и работают) , Я описал это с реализацией события l oop.

Проблема

Возможно, мне что-то не хватает, так как моя реализация этого, что я описал выше, делает не работает правильно. Откуда мне это знать, хорошо, что в Qt Docs и SO выше упоминается, что QThread :: quit () или QThread :: exit () зависят от QEventL oop, и если QThread :: exe c () не был запущен (путем вызова QThread :: run () через QThread :: start () ), затем функции quit () или exit () никогда не запустятся, , что является моей проблемой .

Моя философия реализации похожа на синтаксис Thread & Lambda Java, например

new Thread(() -> { // some code to run in a different thread}).start();

Я использовал следующую реализацию

Контейнерный объект типа Thread, где можно использовать лямбды

QWorkerThread: public QObject

    // This is the thread that runs object below
----QWaitThread : public QThread

    // This is the object which lives inside the above thread
----ThreadWorker : public QObject, public QInterruptable

Простой пример использования будет (очистка потока и дочерних объектов выполняется внутри QWorkerThread):

QWorkerThread *workerThread = new QWorkerThread;
workerThread->setRunnable([](){
    // insert CPU intensive or long running task here
});
workerThread->start();

Описание проблемы / пример

// somewhere in main UI thread
workerThread->stop(); // or workerThread->kill() 

, который вызывает QThread::quit() или QThread::quit(), затем QThread::terminate(), за которым следует QThread::wait(), не прервет поток , Длительный процесс, определенный в лямбде (внутри setRunnable()), будет работать до тех пор, пока он не будет завершен.

Я знаю, что этот пост длиннее обычного, но я бы предпочел, чтобы все получили полное "изображение" о том, чего я пытаюсь достичь, так как я не уверен, в чем заключается моя проблема.

Буду признателен за любую помощь!


Реализация кода

Я буду публиковать весь код для полной идеи реализации, если я упущу что-то важное.

QWaitThread.h - реализация QThread

#ifndef QWAITTHREAD_H
#define QWAITTHREAD_H

#include <QObject>
#include <QThread>
#include <QWaitCondition>
#include <QMutex>

class QWaitThread : public QThread
{
    Q_OBJECT
public:
    explicit QWaitThread(QObject *parent = nullptr);
    ~QWaitThread();
    virtual void pause();
    virtual void resume();    

signals:
    void paused();
    void resumed();

public slots:
    void pausing();
    void resuming();

private:
    QWaitCondition *waitCondition;
    QMutex mutex;
};

#endif // QWAITTHREAD_H

QWaitThread. cpp

#include "qwaitthread.h"

QWaitThread::QWaitThread(QObject *parent) : QThread(parent)
{
    waitCondition = new QWaitCondition;
}

QWaitThread::~QWaitThread()
{
    if(waitCondition != nullptr) {
        delete waitCondition;
    }
}

void QWaitThread::pause()
{
    emit paused();
    waitCondition->wait(&mutex);
}

void QWaitThread::resume()
{
    waitCondition->wakeAll();
    emit resumed();
}

void QWaitThread::pausing()
{
    pause();
}

void QWaitThread::resuming()
{
    resume();
}

Интерфейс QInterruptable.h определяет некоторые ожидаемые функции

#ifndef QINTERRUPTABLE_H
#define QINTERRUPTABLE_H

class QInterruptable {
public:
    virtual void pause() = 0;
    virtual void resume() = 0;
    virtual void interrupt() = 0;
    virtual ~QInterruptable() = default;
};

#endif // QINTERRUPTABLE_H

ThreadWorker.h - это объект, который живет (и работает) внутри QWaitThread

#ifndef THREADWORKER_H
#define THREADWORKER_H

#include <QObject>
#include <functional>
#include <QWaitCondition>
#include <QMutex>

#include "QInterruptable.h"

class ThreadWorker : public QObject, public QInterruptable
{
    Q_OBJECT

private:
    QMutex mutex;
    QWaitCondition *waitCondition;
    std::function<void ()> runnable;
    bool shouldPause = false;

public:
    explicit ThreadWorker(QObject *parent = nullptr);
    ThreadWorker(std::function<void ()> func);
    ~ThreadWorker();

    void setRunnable(const std::function<void ()> &value);

signals:
    /**
     * Emitted when the QWorkerThread object has started work
     * @brief started
     */
    void started();

    /**
     * @brief progress reports on progress in method, user defined.
     * @param value reported using int
     */
    void progress(int value);

    /**
     * Emitted when the QWorkerThread object has finished its work, same signal is used from &QThread::finished
     * @brief started
     */
    void finished();

    /**
     * Emitted when the QWorkerThread has encountered an error, user defined.
     * @brief started
     */
    void error();

public slots:
    virtual void run();
    virtual void cleanup();


    // QInterruptable interface
public:
    void pause()
    {
        shouldPause = true;
    }
    void resume()
    {
        shouldPause = false;
    }
    QMutex& getMutex();
    QWaitCondition *getWaitCondition() const;
    void setWaitCondition(QWaitCondition *value);
    bool getShouldPause() const;

    // QInterruptable interface
public:
    void interrupt()
    {

    }
};

#endif // THREADWORKER_H

ThreadWorker. cpp

#include "threadworker.h"

void ThreadWorker::setRunnable(const std::function<void ()> &value)
{
    runnable = value;
}

QMutex& ThreadWorker::getMutex()
{
    return mutex;
}

QWaitCondition *ThreadWorker::getWaitCondition() const
{
    return waitCondition;
}

void ThreadWorker::setWaitCondition(QWaitCondition *value)
{
    waitCondition = value;
}

bool ThreadWorker::getShouldPause() const
{
    return shouldPause;
}

ThreadWorker::ThreadWorker(QObject *parent) : QObject(parent)
{
    waitCondition = new QWaitCondition;
}

ThreadWorker::ThreadWorker(std::function<void ()> func): runnable(func) {
    waitCondition = new QWaitCondition;
}


ThreadWorker::~ThreadWorker()
{    
    if(waitCondition != nullptr){
        delete waitCondition;
    }
}

void ThreadWorker::run()
{
    emit started();
    runnable();
    emit finished();
}

void ThreadWorker::cleanup()
{

}

QWorkerThread.h основной класс интересов, где runnable lambda принято и где происходит обработка основного потока, перемещение в поток, запуск потока, обработка событий и т. д. c

#ifndef QWORKERTHREAD_H
#define QWORKERTHREAD_H

#include <QObject>
#include <functional>
#include <QThread>
#include <QEventLoop>

#include "qwaitthread.h"
#include "threadworker.h"

class QWorkerThread: public QObject
{
    Q_OBJECT

public:

    enum State {
        Running,
        Paused,
        NotRunning,
        Finished,
        Waiting,
        Exiting
    };

    QWorkerThread();
    explicit QWorkerThread(std::function<void ()> func);
    ~QWorkerThread();
    static QString parseState(QWorkerThread::State state);
    virtual void setRunnable(std::function <void()> runnable);
    virtual void start(QThread::Priority priority = QThread::Priority::InheritPriority);
    virtual void stop();
    virtual void wait(unsigned long time = ULONG_MAX);
    virtual void kill();
    virtual void setWorkerObject(ThreadWorker *value);

    virtual void pause();
    virtual void resume();
    virtual QWaitThread *getWorkerThread() const;

    State getState() const;


signals:
    /**
     * Emitted when the QWorkerThread object has started work
     * @brief started
     */
    void started();

    /**
     * @brief progress reports on progress in method, user defined.
     * @param value reported using int
     */
    void progress(int value);

    /**
     * Emitted when the QWorkerThread object has finished its work, same signal is used from &QThread::finished
     * @brief started
     */
    void finished();

    /**
     * Emitted when the QWorkerThread has encountered an error, user defined.
     * @brief started
     */
    void error();

private:
    /**
     * @brief workerObject - Contains the object and 'method' that will be moved to `workerThread`
     */
    ThreadWorker *workerObject = nullptr;

    /**
     * @brief workerThread - Worker Thread is seperate thread that runs the method
     */
    QWaitThread *workerThread = nullptr;

    State state = State::NotRunning;

};

#endif // QWORKERTHREAD_H

QWorkerThread. cpp реализация

#include "qworkerthread.h"

QWorkerThread::QWorkerThread()
{
    state = State::NotRunning;
    workerThread = new QWaitThread;
    workerObject = new ThreadWorker;
    workerThread->setObjectName("WorkerThread");
}

QWorkerThread::QWorkerThread(std::function<void ()> func)
{
    state = State::NotRunning;
    workerThread = new QWaitThread;
    workerObject = new ThreadWorker(func);
    workerThread->setObjectName("WorkerThread");
}

QWorkerThread::~QWorkerThread()
{
    //  Check if worker thread is running
    if(workerThread->isRunning()) {

        // Exit thread with -1
        workerThread->exit(-1);
    }

    if(!workerThread->isFinished()) {
        workerThread->wait(500);

        if(workerThread->isRunning()) {
            workerThread->terminate();
        }
    }

    // cleanup
    delete workerObject;
    delete workerThread;
}

void QWorkerThread::setRunnable(std::function<void ()> runnable)
{
    workerObject->setRunnable(runnable);
}

void QWorkerThread::start(QThread::Priority priority)
{

    state = State::Running;
    // Connect workerThread start signal to ThreadWorker object's run slot
    connect(workerThread, &QThread::started, workerObject, &ThreadWorker::started);
    connect(workerThread, &QThread::started, workerObject, &ThreadWorker::run);

    // Connect threadWorker progress report to this progress report
    connect(workerObject, &ThreadWorker::progress, this, &QWorkerThread::progress);

    // Cleanup
    connect(workerObject, &ThreadWorker::finished, this, [this](){
        state = State::Finished;
        emit finished();
    });
    connect(workerThread, &QWaitThread::finished, this, [this] {
        workerObject->deleteLater();
    });

    // move workerObject to thread
    workerObject->moveToThread(workerThread);

    // emit signal that we are starting
    emit started();

    // Start WorkerThread which invokes object to start process method
    workerThread->start(priority);
}

void QWorkerThread::stop()
{
    state = State::Exiting;
    // Exit thread safely with success
    workerThread->quit();

    emit finished();
}

void QWorkerThread::wait(unsigned long time)
{
    state = State::Waiting;
    workerThread->wait(time);
}

void QWorkerThread::kill()
{
    // try stopping
    stop();

    // check if still running
    if(workerThread->isRunning()){
        // forcefully kill
        workerThread->terminate();
        workerThread->wait();
    }

    emit finished();
}

void QWorkerThread::setWorkerObject(ThreadWorker *value)
{
    workerObject = value;
}

QWaitThread *QWorkerThread::getWorkerThread() const
{
    return workerThread;
}

QWorkerThread::State QWorkerThread::getState() const
{
    return state;
}

QString QWorkerThread::parseState(QWorkerThread::State state) {
    switch (state) {
        case Running:
            return "Running";
        case Paused:
            return "Paused";
        case NotRunning:
            return "NotRunning";
        case Finished:
            return "Finished";
        case Waiting:
            return "Waiting";
        case Exiting:
            return "Exiting";
    }

    return QString("Unknown State [%1]").arg(QString::number(state)) ;
}

void QWorkerThread::pause()
{
    workerObject->pause();
    state = State::Paused;
}

void QWorkerThread::resume()
{
    workerObject->resume();
    state = State::Running;
}

Обновление с некоторой дополнительной информацией

Что касается ~QWorkerThread(), я заметил, что при вызове delete QThread или QThread::deleteLater(), QWaitThread() (или QThread) выдаст фатальную ошибку : поток уничтожен, пока он еще работает. Это после того, как quit() / terminate() был вызван.

Следующая строка из QThread. cpp

if (d->running && !d->finished && !d->data->isAdopted)
    qFatal("QThread: Destroyed while thread is still running");

где

d->running == true
d->finished == false
d->data->isAdopted ?

1 Ответ

1 голос
/ 31 марта 2020

Я проверил ваш код, и вот что я понял.
Как вы упоминали, terminate() не полностью останавливает поток. Qt do c говорит:

Завершает выполнение потока. Поток может быть или не быть немедленно прекращен, в зависимости от политик планирования операционной системы. Используйте QThread::wait() после terminate(), чтобы быть уверенным.

К сожалению, wait() зависает даже после terminate(). Возможно, это проблема с вашим кодом, но я создал максимально упрощенный пример, чтобы проверить это, но у него все те же проблемы.

Во-первых, вот часть вашего кода, которую я бы предложил изменить:

QWorkerThread::~QWorkerThread()
{
    ...
    // cleanup
    delete workerObject; // Unsafe, but the only way to call the destructor, if necessary
    delete workerThread; // qFatal
}

Вот что Qt делает c о небезопасности деструктора:

Вызов delete для QObject из потока, отличного от того, которому принадлежит объект ( или доступ к объекту другими способами) небезопасен , если только вы не гарантируете, что объект не обрабатывает события в этот момент. Вместо этого используйте QObject::deleteLater(), и будет опубликовано событие DeferredDelete, которое, в конце концов, обнаружит событие l oop потока объекта. По умолчанию поток, которому принадлежит QObject, является потоком, который создает QObject, но не после вызова QObject::moveToThread().

Примечание. Изменение delete workerThread на workerThread->deleteLater() работает для меня без qFatal.


Хорошо, какие проблемы у нас на самом деле:

  1. Дескриптор подкласса QThread не может быть вызван непосредственно после terminate() из-за qFatal
  2. wait() зависает и не может использоваться после terminate() несмотря на документы

(кажется, проблема актуальна только тогда, когда бесконечная операция перемещается в событие l oop)

Убедитесь, что проблема не в другом месте вашего кода. Минимальный воспроизводимый пример

Worker.h

#pragma once

#include <QObject>

class Worker : public QObject
{
    Q_OBJECT

public:
    ~Worker();

public slots:
    void process();
};

Рабочий. cpp

#include "Worker.h"
#include <QThread>
#include <QDebug>
#include <QDateTime>

Worker::~Worker()
{
    qDebug() << "~Worker()";
}

void Worker::process()
{
    qDebug("Hello World!");

    while(true)
    {
        qDebug() << QDateTime::currentDateTime();
        QThread::msleep(100);
    }
}

MainWin.h

#pragma once

#include <QtWidgets/QMainWindow>

class QThread;
class Worker;

class MainWin : public QMainWindow
{
    Q_OBJECT

public:
    MainWin(QWidget *parent = nullptr);
    ~MainWin();

private:
    QThread*    thread = nullptr;
    Worker*     worker = nullptr;
};

MainWin. cpp

#include "MainWin.h"
#include "Worker.h"
#include <QThread>
#include <QDebug>
#include <QDateTime>

MainWin::MainWin(QWidget *parent)
  : QMainWindow(parent)
{
    thread = new QThread;
    worker = new Worker;

    worker->moveToThread(thread);

    // Start only one infinite operation
    connect(thread, &QThread::started, worker, &Worker::process);

    thread->start();
}

MainWin::~MainWin()
{
    if (thread->isRunning())
    {
        thread->exit(-1);
        thread->wait(500);
    }

    if (thread->isRunning())
    {
        thread->terminate();
    }

    //cleanup
    delete worker;
    delete thread; // qFatal("QThread: Destroyed while thread is still running")
}

Единственный рабочий код, который я нашел

MainWin::~MainWin()
{
    ...
    //cleanup
    delete worker; // Worker destructor will be called, but be note this is unsafe
    thread->deleteLater(); // Allows to avoid qFatal but make thread terminated
}

Выводы и предложения

Все, что я могу предложить, кроме того, чтобы вообще избежать terminate(), это использовать terminate() w ithout wait(), а затем workerThread->deleteLater().

Если трудоемкая операция, которую вы пытаетесь прервать, является вашим собственным кодом, рассмотрите возможность встраивания какого-либо флага завершения в код.

It было бы лучше избегать необработанных указателей и заменять их умными указателями, где это возможно.


Что еще я могу предложить как универсальный способ запуска лямбда-выражений в потоках

Упрощенный пример, как можно используйте lamdas, сигналы-слоты, потоки, начато-законченные сигналы, QtConcurrent::run() и QFuture<>. Таким образом, вы можете добиться как выполнения кода в одном постоянном дополнительном потоке, так и в автоматическом c пуле потоков. Но завершение не поддерживается.

LambdaThread.h

#pragma once

#include <QObject>
#include <functional>
#include <QFuture>

class QThreadPool;

class LambdaThread : public QObject
{
    Q_OBJECT

public:
    // maxThreadCount = -1 to use idealThreadCount by default
    LambdaThread(QObject *parent, int maxThreadCount = -1);

signals:
    void started();
    void finished();

public slots:
    // Invoke this directly or by a signal
    QFuture<void> setRunnable(std::function<void()> func);

private:
    /*
    For the case you need persistent thread sometimes.
    In the case you never need persistent thread,
    just remove m_threadPool from this class at all
    */
    QThreadPool* m_threadPool = nullptr;
};

LambdaThread. cpp

#include "LambdaThread.h"
#include <QtConcurrent/QtConcurrent>
#include <QThreadPool>

LambdaThread::LambdaThread(QObject *parent, int maxThreadCount /*= -1*/)
    : QObject(parent)
{
    m_threadPool = new QThreadPool(this);

    if(maxThreadCount > 0)
    {
        m_threadPool->setMaxThreadCount(maxThreadCount);

        if (maxThreadCount == 1)
        {
            // Avoid thread affinity changing
            m_threadPool->setExpiryTimeout(-1);
        }
    }
}

QFuture<void> LambdaThread::setRunnable(std::function<void()> func)
{
    return QtConcurrent::run(m_threadPool,
        [this, func]()
    {
        // Be note that you actually need event loop in a receiver thread only
        emit started();

        func();

        emit finished();
    });
}

Просто GUI пример класса, где вы можете запустить свои runnables и получать сигналы.

MainWin.h

#pragma once

#include <QtWidgets/QMainWindow>
#include <functional>

class LambdaThread;

class MainWin : public QMainWindow
{
    Q_OBJECT

public:
    MainWin(QWidget *parent = nullptr);

signals:
    // For the case you want to use signals
    void newRunnable(std::function<void()> func);

private:
    LambdaThread* m_lambdaThread = nullptr;
};

MainWin. cpp

#include "MainWin.h"
#include "LambdaThread.h"
#include <QFuture>
#include <QDebug>

MainWin::MainWin(QWidget *parent)
    : QMainWindow(parent)
{
    m_lambdaThread = new LambdaThread(this);

    connect(this, &MainWin::newRunnable,
        m_lambdaThread, &LambdaThread::setRunnable);

    /*
    Do not forget the third (`this`) context variable
    while using modern signal-slot connection syntax with lambdas
    */
    connect(m_lambdaThread, &LambdaThread::started,
        this, []()
    {
        qDebug() << "Runnable stated";
    });

    connect(m_lambdaThread, &LambdaThread::finished,
        this, []()
    {
        qDebug() << "Runnable finished";
    });

    // Set your lambda directly
    QFuture<void> future = m_lambdaThread->setRunnable([]()
    {
        qDebug() << "hello from threaded runnable";
    });

    // You can also use future (not necessary of course)
    //future.waitForFinished();

    // Or you can emit your lambda via the signal:
    emit newRunnable([]()
    {
        qDebug() << "hello from threaded runnable which comes from signal";
    });
}
...