Как долго спать с потоками PyQt? - PullRequest
0 голосов
/ 01 апреля 2020

У меня есть ряд определенных объектов, которые должны запускать определенную c функцию с указанными c постоянно меняющимися интервалами, снова и снова, пока они не решат, что они сделаны.

Например, одному объекту может понадобиться подождать 30 секунд, запустить, подождать 60 секунд, запустить, подождать 10 секунд, запустить ... Вы получаете точку, и это может продолжаться для 30-120 различных объектов, выполняющих точно такую ​​же функцию .

Я думал, что просто наличие функции, которая спит для точного интервала, решит мою проблему, но, поправьте меня, если я ошибаюсь, я вспомнил, что пулы потоков могут запускать только определенное число потоков в любой момент времени (12 для меня). Как мне обойти этот предел?

class Thing(object):
    def getCurrentPeriod(self):
        return random.randint(5, 30) # Some ever changing period of time

    def refresh(self):
        doThings() # A long running task that is disk and network intensive

    def waitRefresh(self):
        period = self.getCurrentPeriod()
        time.sleep(period) # Wait that period out
        self.refresh()
        return self.needRefresh()
        # Boolean if it needs to restart - Not sure about how to reschedule,  
        # or specifically where to connect the worker emit when it finishes 
        # to make sure this *specific* Thing obj gets it's waitRefresh func called again.

class App(QMainWindow):
    def __init__(self, *args, **kwargs):
    super(MainWindow, self).__init__(*args, **kwargs)

    self.threadpool = QThreadPool()

    # Add initial objects to pool (other portions of app may add more over time)
    for thing in self.acquireThings():
        worker = Worker(thing.waitRefresh)
        self.threadpool.start(worker)

Не включает ни класс WorkerSignals, ни подкласс QRunnable, этот пример включает то, что я обычно делаю. Пример решает ту же проблему, но (скорее всего) неэффективно.

edit: Новый пример с полным рабочим примером того, как time.sleep не останавливает поток и не позволяет другие на работу. Я чувствую, что async может быть единственной реализацией, но есть ли быстрое решение, чтобы мне не пришлось изменять все мое приложение?

Вот как это выглядит, когда вы пытаетесь больше спать чем 12 потоков.

1 Ответ

0 голосов
/ 02 апреля 2020

Окончательное решение пришло, когда я решил попробовать класс QTimer. Возможно, есть более оптимизированные решения, но этот, кажется, ставит все флажки, даже если он очень прост.

import random
import time
import traceback

from functools import partial
from PyQt5.QtCore import *
from PyQt5.QtGui import QFont
from PyQt5.QtWidgets import *


class WorkerSignals(QObject):
    """
    Represents the signals a Worker can emit.
    """
    finished = pyqtSignal()
    starting = pyqtSignal(int) # ID of thread
    result = pyqtSignal(tuple) # Tuple refresh result, result and ID

class Worker(QRunnable):
    """
    A worker designed to tell when it's starting, when it's finished and the result.
    Designed to work around Thread.refresh().
    """

    def __init__(self, fn, thread_id, *args, **kwargs):
        super(Worker, self).__init__()

        # Store constructor arguments (re-used for processing)
        self.fn = fn
        self.id = thread_id
        self.args = args
        self.kwargs = kwargs
        self.signals = WorkerSignals()

    @pyqtSlot()
    def run(self):
        """
        Runs a given method, and emits the result with the Worker's coordinated ID.
        """
        try:
            self.signals.starting.emit(self.id) # Thread is now finally ready to work.
            result = self.fn(*self.args, **self.kwargs) # Refresh Thread!
            self.signals.result.emit(result) # Thread is finished, emit result tuple.
        except:
            traceback.print_exc()
        finally:
            self.signals.finished.emit()  # Done


class Thread(object):
    """
    Basic Rules for a Thread Object:
    Cannot store the next timestamp on the object (it's a database object, I don't believe it's good practice
    to be creating sessions over and over to simply read/write the access time.
    ID and Active are allowed as booleans.
    """
    i = -1

    def __init__(self):
        self.id = Thread.nextID()
        self.active = True
        self.refreshes = 0

    def refresh(self) -> tuple:
        """
        'Refreshes' a thread. Waits a specific period, then decides whether Thread object should be deactivated or
        returned from additional refreshes. Chance of deactivation lowers with each refresh.
        :return: The refresh result, a tuple with a boolean and the thread's ID (for identifying it later)
        """

        # Represents my SQL Alchemy Model's refresh() function
        self.refreshes += 1
        time.sleep(random.randint(2, 5))
        if random.random() <= max(0.1, 1.0 - ((self.refreshes + 5) / 10)):
            self.active = False
        return self.active, self.id

    @staticmethod
    def getRefreshTime() -> float:
        """
        Represents the amount of time before a thread should be refreshed.
        Should NOT be used to determine whether the thread is still active or not.

        :return: The time period that should be waited.
        """

        return random.uniform(10, 300)

    @staticmethod
    def nextID() -> int:
        """
        Returns integer thread IDs in sequence to remove possibility of duplicate IDs.
        :return: Integer Thread ID
        """
        Thread.i += 1
        return Thread.i

    def __repr__(self):
        return f'Thread(id={self.id} active={self.active})'


class MainWindow(QMainWindow):
    """
    GUI containing a Label, Button and ListWidget showing all the active sleeping/working threads.
    Manages a threadpool, a number of background singleshot timers, etc.
    """

    def __init__(self, *args, **kwargs):
        super(MainWindow, self).__init__(*args, **kwargs)

        # Widgets Setup
        layout = QVBoxLayout()
        self.list = QListWidget()
        self.l = QLabel("Total Active: 0")
        self.button = QPushButton("Refresh List")
        self.button.pressed.connect(self.refreshList)
        self.button.setDisabled(True)
        layout.addWidget(self.l)
        layout.addWidget(self.button)
        layout.addWidget(self.list)
        w = QWidget()
        w.setLayout(layout)
        self.setCentralWidget(w)
        self.show()

        # Periodically add threads to the pool.
        self.poolTimer = QTimer()
        self.poolTimer.setInterval(5_000)
        self.poolTimer.timeout.connect(self.addThreads)

        # Threading Setup
        self.threadpool = QThreadPool()
        print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())

        self.active, self.threads = {}, {}
        # Add a number of threads to start with.
        for _ in range(random.randint(5, 16)):
            self.setupThread(Thread())
        self.poolTimer.start()

    def refreshList(self):
        """
        Refreshes the ListWidget in the GUI with all the active/sleeping/working threads.
        """
        self.list.clear()
        bold = QFont()
        bold.setBold(True)

        active = 0
        for thread in self.threads.values():
            item = QListWidgetItem(
                f'Thread {thread.id}/{thread.refreshes}')
            # Bold a thread if it's working
            if self.active[thread.id]:
                active += 1
                item.setFont(bold)
            self.list.addItem(item)
        self.l.setText(f'Total Active: {active}/{len(self.threads)}')

    def refreshResult(self, result) -> None:
        """
        When a thread is finished, the result determines it's next course of action, which is either
        to return to the pool again, or delete itself.

        :param result: A tuple containing the result (bool) and the connected Thread ID.
        """
        self.active[result[1]] = False
        if result[0]:
            print(f'Restarting Thread {result[1]}')
            self.setupThread(self.threads[result[1]]) # Add by ID, which would normally be a database GET
        else:
            print(f'Thread {result[1]} shutting down.')
            del self.active[result[1]]
            del self.threads[result[1]]
        self.refreshList()

    def updateActivity(self, thread_id) -> None:
        """
        Connected to the starting signal, helps signal when a thread is actually being refreshed.

        :param thread_id: The Thread ID
        """
        print(f'Thread {thread_id} is now active/working.')
        self.active[thread_id] = True

    def refresh(self, thread):
        """
        Adds a new worker to the threadpool to be refreshed.
        Can't be considered a real start to the thread.refresh function, as the pool has a max of 12 workers at any time.
        The 'starting' signal can tell us when a specific thread is actually being refreshed, and is represented
        as a Bold element in the list.

        :param thread: A thread instance.
        """
        print(f'Adding Thread {thread.id} to the pool.')
        worker = Worker(thread.refresh, thread_id=thread.id)
        worker.signals.result.connect(self.refreshResult)
        worker.signals.starting.connect(self.updateActivity)
        self.threadpool.start(worker)
        # self.active[thread.id] = True
        self.refreshList()

    def setupThread(self, thread) -> None:
        """
        Adds a new timer designated to start a specific thread.
        :param thread: A thread instance.
        """
        self.active[thread.id] = False
        self.threads[thread.id] = thread
        t = QTimer()
        period = thread.getRefreshTime()
        t.singleShot(period * 1000, partial(self.refresh, thread=thread))
        print(f'Thread {thread.id} will start in {period} seconds.')
        self.refreshList()

    def addThreads(self):
        """
        Adds a number of threads to the pool. Called automatically every couple seconds.
        """

        add = max(0, 30 + random.randint(-5, 5) - len(self.threads))
        if add > 0:
            print(f'Adding {add} thread{"s" if add > 1 else ""}.')
            for _ in range(add):
                self.setupThread(Thread())

app = QApplication([])
window = MainWindow()
app.exec_()

Когда запрашивается поток, создается таймер и запускается singleShot дополнительная функция, которая добавит его в пул потоков. Этот пул потоков может обрабатывать до 12 обновляемых непрерывных «обновляющих» потоков, а сигналы позволяют GUI обновлять момент обнаружения изменения.

Тысячи объектов «Поток» могут ждать, и кажется, singleShot способен добавлять их в пул именно тогда, когда это необходимо.

Сигналы помогают различать, когда поток имеет значение sleeping, working и active (но inactive Объекты потока немедленно удаляются ).

Единственное предупреждение, которое я могу придумать с этой программой:

1) Может ли реализация QThread превзойти его?

2) Что происходит с QTimer как только функция singleshot будет выполнена и запущена? Будут ли они правильно G C 'd, или тысячи могут накапливаться в фоновом режиме, потребляя ресурсы?

...