Асинхронные задачи блокируют поток в графическом интерфейсе, поэтому эти задачи должны выполняться в другом потоке и отправлять его через сигналы в графический интерфейс, что также позволяет отделить бизнес-логику и графический интерфейс, имеющий более чистый код:
get_module.py
# coding: utf8
#======================= IMPORT AREA =======================
#here's where you import any module needed for the website
from random import randint
#===========================================================
#====================== SETTINGS AREA ======================
#here's where you declare the settings of the website such
#as method, error key, success key, custom settings, etc...
name = "GET Test Config"
method = 'GET' #Method is either GET, POST or CUSTOM
error = ['400', '401', '402', '403', '404', '405', '406', '407', '408', '409', '410', '411', '412', '413', '414', '415', '416', '417', '418', '421', '422', '423', '424', '425', '426', '428', '429', '431', '451','500', '501', '502', '503', '504', '505', '506', '507', '508', '510', '511']
connections = 5
#===========================================================
#====================== DEFINITON AREA =====================
#here's where the definitions are made.
#There's 2 defs:
#def request(): Which returns the url (with or without modifications)
#def settings(): returns the data to send in the GET request
#====== SETTINGS AREA ======
def request():
url = "https://httpbin.org/anything"
return url
def settings():
data = {'example':'example'}
return (data)
#===========================================================
main.py
import importlib
import multio
import qdarkstyle
import trio
from PyQt5 import QtCore, QtWidgets
from asks import Session
module = importlib.import_module("get_module")
class TaskWorker(QtCore.QObject):
totalChanged = QtCore.pyqtSignal(int)
validChanged = QtCore.pyqtSignal(int)
invalidChanged = QtCore.pyqtSignal(int)
def __init__(self, parent=None):
super(TaskWorker, self).__init__(parent)
self._total = 0
self._valid = 0
self._invalid = 0
@QtCore.pyqtProperty(int, notify=totalChanged)
def total(self):
return self._total
@total.setter
def total(self, value):
if self._total == value:
return
self._total = value
self.totalChanged.emit(self._total)
@QtCore.pyqtProperty(int, notify=validChanged)
def valid(self):
return self._valid
@valid.setter
def valid(self, value):
if self._valid == value:
return
self._valid = value
self.validChanged.emit(self._valid)
@QtCore.pyqtProperty(int, notify=invalidChanged)
def invalid(self):
return self._invalid
@invalid.setter
def invalid(self, value):
if self._invalid == value:
return
self._invalid = value
self.invalidChanged.emit(self._invalid)
@QtCore.pyqtSlot()
def check(self):
async def worker1(s):
if module.method.lower() == "get":
r = await s.get(module.request(), params=module.settings())
elif module.method.lower() == "post":
r = await s.post(module.request(), data=module.settings())
# if any(x in r.status_code for x in module.error):
if str(r.status_code) in module.error:
print("BAD -- " + module.request())
self.total += 1
self.invalid += 1
else:
print("GOOD -- " + module.request())
self.total += 1
self.valid += 1
print(r.text)
async def worker2(s):
if module.method.lower() == "get":
r = await s.get(module.request(), params=module.settings())
elif module.method.lower() == "post":
r = await s.post(module.request(), data=module.settings())
# if any(x in r.status_code for x in module.error):
if str(r.status_code) in module.error:
print("BAD -- " + module.request())
self.total += 1
self.invalid += 1
else:
print("GOOD -- " + module.request())
self.total += 1
self.valid += 1
print(r.text)
async def example():
s = Session(connections=module.connections)
for i in range(40):
async with trio.open_nursery() as nursery:
nursery.start_soon(worker1, s)
nursery.start_soon(worker2, s)
multio.init("trio")
trio.run(example)
class Menu(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Status Codes")
self.central_widget = QtWidgets.QWidget()
self.setCentralWidget(self.central_widget)
lay = QtWidgets.QVBoxLayout(self.central_widget)
self.resize(500, 350)
self.ok = QtWidgets.QLabel(
"200: <font color='green'>0</font>",
alignment=QtCore.Qt.AlignHCenter,
)
self.bad = QtWidgets.QLabel(
"400: <font color='yellow'>0</font>",
alignment=QtCore.Qt.AlignHCenter,
)
self.total = QtWidgets.QLabel(
"Total: <font color='#00FF00'>0</font>",
alignment=QtCore.Qt.AlignHCenter,
)
lay.addWidget(self.ok)
lay.addWidget(self.bad)
lay.addWidget(self.total)
thread = QtCore.QThread(self)
thread.start()
self.worker = TaskWorker()
self.worker.moveToThread(thread)
self.worker.totalChanged.connect(self.updateTotal)
self.worker.validChanged.connect(self.updateValid)
self.worker.invalidChanged.connect(self.updateInvalid)
QtCore.QTimer.singleShot(0, self.worker.check)
@QtCore.pyqtSlot(int)
def updateTotal(self, total):
self.total.setText("Total: <font color='#F40D30'>%s</font>" % (total))
@QtCore.pyqtSlot(int)
def updateValid(self, valid):
self.ok.setText("200: <font color='green'>%s</font>" % (valid))
@QtCore.pyqtSlot(int)
def updateInvalid(self, invalid):
self.bad.setText("400: <font color='#00FF00'>%s</font>" % (invalid))
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
app.setStyleSheet(qdarkstyle.load_stylesheet_pyqt5())
ex = Menu()
ex.show()
sys.exit(app.exec_())