Окончательное решение пришло, когда я решил попробовать класс QTimer
. Возможно, есть более оптимизированные решения, но этот, кажется, ставит все флажки, даже если он очень прост.
import random
import time
import traceback
from functools import partial
from PyQt5.QtCore import *
from PyQt5.QtGui import QFont
from PyQt5.QtWidgets import *
class WorkerSignals(QObject):
"""
Represents the signals a Worker can emit.
"""
finished = pyqtSignal()
starting = pyqtSignal(int) # ID of thread
result = pyqtSignal(tuple) # Tuple refresh result, result and ID
class Worker(QRunnable):
"""
A worker designed to tell when it's starting, when it's finished and the result.
Designed to work around Thread.refresh().
"""
def __init__(self, fn, thread_id, *args, **kwargs):
super(Worker, self).__init__()
# Store constructor arguments (re-used for processing)
self.fn = fn
self.id = thread_id
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
@pyqtSlot()
def run(self):
"""
Runs a given method, and emits the result with the Worker's coordinated ID.
"""
try:
self.signals.starting.emit(self.id) # Thread is now finally ready to work.
result = self.fn(*self.args, **self.kwargs) # Refresh Thread!
self.signals.result.emit(result) # Thread is finished, emit result tuple.
except:
traceback.print_exc()
finally:
self.signals.finished.emit() # Done
class Thread(object):
"""
Basic Rules for a Thread Object:
Cannot store the next timestamp on the object (it's a database object, I don't believe it's good practice
to be creating sessions over and over to simply read/write the access time.
ID and Active are allowed as booleans.
"""
i = -1
def __init__(self):
self.id = Thread.nextID()
self.active = True
self.refreshes = 0
def refresh(self) -> tuple:
"""
'Refreshes' a thread. Waits a specific period, then decides whether Thread object should be deactivated or
returned from additional refreshes. Chance of deactivation lowers with each refresh.
:return: The refresh result, a tuple with a boolean and the thread's ID (for identifying it later)
"""
# Represents my SQL Alchemy Model's refresh() function
self.refreshes += 1
time.sleep(random.randint(2, 5))
if random.random() <= max(0.1, 1.0 - ((self.refreshes + 5) / 10)):
self.active = False
return self.active, self.id
@staticmethod
def getRefreshTime() -> float:
"""
Represents the amount of time before a thread should be refreshed.
Should NOT be used to determine whether the thread is still active or not.
:return: The time period that should be waited.
"""
return random.uniform(10, 300)
@staticmethod
def nextID() -> int:
"""
Returns integer thread IDs in sequence to remove possibility of duplicate IDs.
:return: Integer Thread ID
"""
Thread.i += 1
return Thread.i
def __repr__(self):
return f'Thread(id={self.id} active={self.active})'
class MainWindow(QMainWindow):
"""
GUI containing a Label, Button and ListWidget showing all the active sleeping/working threads.
Manages a threadpool, a number of background singleshot timers, etc.
"""
def __init__(self, *args, **kwargs):
super(MainWindow, self).__init__(*args, **kwargs)
# Widgets Setup
layout = QVBoxLayout()
self.list = QListWidget()
self.l = QLabel("Total Active: 0")
self.button = QPushButton("Refresh List")
self.button.pressed.connect(self.refreshList)
self.button.setDisabled(True)
layout.addWidget(self.l)
layout.addWidget(self.button)
layout.addWidget(self.list)
w = QWidget()
w.setLayout(layout)
self.setCentralWidget(w)
self.show()
# Periodically add threads to the pool.
self.poolTimer = QTimer()
self.poolTimer.setInterval(5_000)
self.poolTimer.timeout.connect(self.addThreads)
# Threading Setup
self.threadpool = QThreadPool()
print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())
self.active, self.threads = {}, {}
# Add a number of threads to start with.
for _ in range(random.randint(5, 16)):
self.setupThread(Thread())
self.poolTimer.start()
def refreshList(self):
"""
Refreshes the ListWidget in the GUI with all the active/sleeping/working threads.
"""
self.list.clear()
bold = QFont()
bold.setBold(True)
active = 0
for thread in self.threads.values():
item = QListWidgetItem(
f'Thread {thread.id}/{thread.refreshes}')
# Bold a thread if it's working
if self.active[thread.id]:
active += 1
item.setFont(bold)
self.list.addItem(item)
self.l.setText(f'Total Active: {active}/{len(self.threads)}')
def refreshResult(self, result) -> None:
"""
When a thread is finished, the result determines it's next course of action, which is either
to return to the pool again, or delete itself.
:param result: A tuple containing the result (bool) and the connected Thread ID.
"""
self.active[result[1]] = False
if result[0]:
print(f'Restarting Thread {result[1]}')
self.setupThread(self.threads[result[1]]) # Add by ID, which would normally be a database GET
else:
print(f'Thread {result[1]} shutting down.')
del self.active[result[1]]
del self.threads[result[1]]
self.refreshList()
def updateActivity(self, thread_id) -> None:
"""
Connected to the starting signal, helps signal when a thread is actually being refreshed.
:param thread_id: The Thread ID
"""
print(f'Thread {thread_id} is now active/working.')
self.active[thread_id] = True
def refresh(self, thread):
"""
Adds a new worker to the threadpool to be refreshed.
Can't be considered a real start to the thread.refresh function, as the pool has a max of 12 workers at any time.
The 'starting' signal can tell us when a specific thread is actually being refreshed, and is represented
as a Bold element in the list.
:param thread: A thread instance.
"""
print(f'Adding Thread {thread.id} to the pool.')
worker = Worker(thread.refresh, thread_id=thread.id)
worker.signals.result.connect(self.refreshResult)
worker.signals.starting.connect(self.updateActivity)
self.threadpool.start(worker)
# self.active[thread.id] = True
self.refreshList()
def setupThread(self, thread) -> None:
"""
Adds a new timer designated to start a specific thread.
:param thread: A thread instance.
"""
self.active[thread.id] = False
self.threads[thread.id] = thread
t = QTimer()
period = thread.getRefreshTime()
t.singleShot(period * 1000, partial(self.refresh, thread=thread))
print(f'Thread {thread.id} will start in {period} seconds.')
self.refreshList()
def addThreads(self):
"""
Adds a number of threads to the pool. Called automatically every couple seconds.
"""
add = max(0, 30 + random.randint(-5, 5) - len(self.threads))
if add > 0:
print(f'Adding {add} thread{"s" if add > 1 else ""}.')
for _ in range(add):
self.setupThread(Thread())
app = QApplication([])
window = MainWindow()
app.exec_()
Когда запрашивается поток, создается таймер и запускается singleShot
дополнительная функция, которая добавит его в пул потоков. Этот пул потоков может обрабатывать до 12 обновляемых непрерывных «обновляющих» потоков, а сигналы позволяют GUI обновлять момент обнаружения изменения.
Тысячи объектов «Поток» могут ждать, и кажется, singleShot
способен добавлять их в пул именно тогда, когда это необходимо.
Сигналы помогают различать, когда поток имеет значение sleeping
, working
и active
(но inactive
Объекты потока немедленно удаляются ).
Единственное предупреждение, которое я могу придумать с этой программой:
1) Может ли реализация QThread превзойти его?
2) Что происходит с QTimer
как только функция singleshot
будет выполнена и запущена? Будут ли они правильно G C 'd, или тысячи могут накапливаться в фоновом режиме, потребляя ресурсы?