Попытка заставить QProcess работать с очередью - PullRequest
0 голосов
/ 24 ноября 2018

Я пытаюсь запустить несколько процессов с очередью и получить вывод для всех процессов, используя QProcess, но у меня возникла пара проблем.Я использую QSpinBox для одновременной установки максимального числа процессов, и я могу заставить все работать нормально в главном потоке или если я запускаю цикл с процессами в QObject, но я не могу его получитьдля правильной работы в QThread.
Я знаю, что нет необходимости использовать потоки с QProcess, но с циклом у меня почти нет выбора.При запуске в главном потоке он на мгновение зависает до запуска процессов, и я предпочел бы, чтобы он работал более плавно.
Я получаю только ошибки при попытке запустить процессы в QThread, если не использую что-то вроде _process.waitForFinished(), нопроблема в том, что процессы запускаются только по одному за раз.
У кого-нибудь есть какие-либо предложения по правильной работе?В настоящее время я использую Pyside2, но ответ для Pyside2 или PyQt5 будет в порядке.Благодарю.

import queue
import sys
from PySide2.QtCore import QProcess, QTextCodec, QThread, Qt
from PySide2.QtWidgets import QApplication, QWidget, QSpinBox, \
    QPushButton, QVBoxLayout

class Window(QWidget):
    def __init__(self):
        QWidget.__init__(self)
        self.setAttribute(Qt.WA_DeleteOnClose, True)
        self.queue = queue.Queue()
        layout = QVBoxLayout(self)
        self.startBtn = QPushButton('Start', clicked=self.addToQueue)
        self.spinBox = QSpinBox(value=3)
        layout.addWidget(self.spinBox)
        layout.addWidget(self.startBtn)
        self.taskList = ['my.exe -value','my.exe -value','my.exe -value','my.exe -value',
                         'my.exe -value','my.exe -value','my.exe -value','my.exe -value']

    def addToQueue(self):
        for i in self.taskList:
            self.queue.put(i)
        self.sendToThread()

    def sendToThread(self):
        vals = {'max': self.spinBox.value()}
        self.taskThread = TaskThread(self.queue, vals)
        self.taskThread.start()

    def closeEvent(self, event):
        event.accept()

class TaskThread(QThread):
    def __init__(self, queue=None, vals=None, parent=None):
        QThread.__init__(self, parent)
        self.queue = queue
        self.vals = vals
        self.maxProcs = self.vals.get('max')
        self.procCount = 0

    def run(self):
        self.start_procs()

    def start_procs(self):
        while not self.queue.empty() and self.procCount < self.maxProcs:
            cmd = self.queue.get()
            _process = QProcess(self)
            _process.setProcessChannelMode(QProcess.MergedChannels)
            self.codec = QTextCodec.codecForLocale()
            self._decoder_stdout = self.codec.makeDecoder()
            _process.readyReadStandardOutput.connect(lambda process=_process: self._ready_read_standard_output(process))
            _process.started.connect(self.procStarted)
            _process.finished.connect(self.procFinished)
            _process.finished.connect(self.decreaseCount)
            _process.finished.connect(self.start_procs)
            _process.start(cmd)
            self.procCount += 1

    def _ready_read_standard_output(self, process):
        self.out = process.readAllStandardOutput()
        self.text = self._decoder_stdout.toUnicode(self.out)
        print(self.text)

    def decreaseCount(self):
        if self.procCount <= 0:
            pass
        else:
            self.procCount -= 1

    def procStarted(self):
        print('started')

    def procFinished(self):
        print('finished')

if __name__ == '__main__':
    app = QApplication(sys.argv)
    window = Window()
    window.resize(200, 100)
    window.show()
    sys.exit(app.exec_())

1 Ответ

0 голосов
/ 25 ноября 2018

Когда вы запускаете процесс, это не означает, что он запущен, потому что у него могут быть проблемы с выполнением, поэтому при первом запуске лучше дождаться запуска процесса или не удалось запустить следующий процесс, выполняя требования, которыечисло запущенных процессов меньше максимального или больше не выполняет задачи или не запущено.

С другой стороны, я также реализовал задачу остановки, которая подразумевает, что больше задач не будет добавлено, нозадачи, которые выполнялись до остановки, будут продолжать выполняться.

Если вы измените максимальное значение на более низкое, то больше задач не будет выброшено, пока условие не будет выполнено.

С учетомвыше, нет необходимости использовать темы

import queue
from PySide2 import QtCore, QtGui, QtWidgets

class TaskManager(QtCore.QObject):
    messageChanged = QtCore.Signal(str)
    numbersTaskRunningChanged = QtCore.Signal(int)

    def __init__(self, parent=None):
        super(TaskManager, self).__init__(parent)
        self._max_task = 1
        self._queue = queue.Queue()
        self._numbers_task_running = 0
        self._running = False

    def setMaxTask(self, max_task):
        self._max_task = max_task
        if self._running:
            self.call_task()

    def maxTask(self):
        return self._max_task

    def appendTask(self, task):
        self._queue.put(task)
        self.call_task()

    def start(self):
        self._running = True
        self.call_task()

    def stop(self):
        self._running = False

    def call_task(self):
        if self._numbers_task_running < self.maxTask() and not self._queue.empty() and self._running:
            cmd = self._queue.get()
            process = QtCore.QProcess(self)
            process.setProcessChannelMode(QtCore.QProcess.MergedChannels)
            process.readyReadStandardOutput.connect(self.on_readyReadStandardOutput)
            process.finished.connect(self.on_finished)
            process.started.connect(self.on_started)
            process.errorOccurred.connect(self.on_errorOccurred)
            process.start(cmd)

    def on_readyReadStandardOutput(self):
        codec = QtCore.QTextCodec.codecForLocale()
        decoder_stdout = codec.makeDecoder()
        process = self.sender()
        text = decoder_stdout.toUnicode(process.readAllStandardOutput())
        self.messageChanged.emit(text)

    def on_errorOccurred(self, error):
        process = self.sender()
        print("error: ", error, "-", " ".join([process.program()] + process.arguments()))
        self.call_task()

    def on_finished(self):
        process = self.sender()
        self._numbers_task_running -= 1
        self.numbersTaskRunningChanged.emit(self._numbers_task_running)
        self.call_task()

    def on_started(self):
        process = self.sender()
        print("started: ", " ".join([process.program()] + process.arguments()))
        self._numbers_task_running += 1
        self.numbersTaskRunningChanged.emit(self._numbers_task_running)
        self.call_task()

class Widget(QtWidgets.QWidget):
    def __init__(self, parent=None):
        super(Widget, self).__init__(parent)
        self.setAttribute(QtCore.Qt.WA_DeleteOnClose, True)
        manager = TaskManager(self)
        task_list = # ...
        for task in task_list:
            manager.appendTask(task)

        button_start = QtWidgets.QPushButton("Start", clicked=manager.start)
        button_stop = QtWidgets.QPushButton("Stop", clicked=manager.stop)
        label = QtWidgets.QLabel("0", alignment=QtCore.Qt.AlignCenter)
        manager.numbersTaskRunningChanged.connect(label.setNum)
        spinBox = QtWidgets.QSpinBox()
        spinBox.valueChanged.connect(manager.setMaxTask)
        spinBox.setValue(3)
        textEdit = QtWidgets.QTextEdit()
        manager.messageChanged.connect(textEdit.append)

        lay = QtWidgets.QVBoxLayout(self)
        lay.addWidget(spinBox)
        lay.addWidget(button_start)
        lay.addWidget(button_stop)
        lay.addWidget(label)
        lay.addWidget(textEdit)

if __name__ == '__main__':
    import sys
    app = QtWidgets.QApplication(sys.argv)
    w = Widget()
    w.show()
    sys.exit(app.exec_())
...