Я разрабатываю код конвейера с открытым исходным кодом, который позволяет пользователям создавать конвейеры перетаскивания в пользовательском интерфейсе на основе PyQt, которые затем запускаются с использованием пользовательского встроенного бэкэнда.
В настоящее время выполнение конвейера и Пользовательский интерфейс работает в том же потоке. Это приводит к зависанию пользовательского интерфейса во время выполнения конвейера. Я хочу переместить конвейер для запуска в отдельном потоке или процессе, а затем создать сигналы, которые подключаются к окну состояния в пользовательском интерфейсе. К сожалению, конвейер использует такие библиотеки, как MNE, которые используют matplotlib для построения графиков. Каждый раз, когда я пытался использовать многопоточность, я заканчивал тем, что графики отображались пустыми
Единственное найденное мною решение, которое надежно работает, - это использование класса подпроцесса, который создаст полностью независимый процесс для запуска конвейера и небольшое окно, сообщающее пользователю ждать (которое сразу же зависает), но это ограничивает мою способность общаться с существующим пользовательским интерфейсом и обновлять экран прогресса.
Вот код, который вызывает новый process
# Call pipeline python file with currently loaded filename
self.pipeline = subprocess.Popen(["python", "RunPipeline.py", self.nodz.currentFileName])
и вот код, который запускается (который затем запускает произвольный конвейер, разработанный пользователем)
from pipeline.NodeFactory import NodeFactory
from pipeline.NodzInterface import NodzInterface
from pipeline.Pipeline import Pipeline
from PyQt5.QtCore import *
from PyQt5 import QtWidgets
import traceback
import sys
def runPipeline(file):
try:
# Extract relevant info from the JSON
nodes, connections, globals = NodzInterface.load(file)
# Build the pipeline graph
pipeline = Pipeline(global_vars = globals)
for node in nodes:
pipeline.add(node[1])
for conn in connections:
pipeline.connect(parent = conn[0], child = conn[1])
pipeline.start()
# Catches any runtime errors and prints to console
# Lets you debug the pipeline nodes if they crash
except Exception:
traceback.print_exc()
class PipelineWindow(QtWidgets.QMainWindow):
def __init__(self):
QtWidgets.QMainWindow.__init__(self)
self.setWindowTitle("Running Pipeline")
lb = QtWidgets.QLabel("Please wait")
self.setCentralWidget(lb)
if __name__ == "__main__":
app = QtWidgets.QApplication([])
window = PipelineWindow()
window.show()
runPipeline(sys.argv[1])
app.exec_()
Хуки должны быть добавлены позже, но будут прикреплены к объект конвейера.
Трудно полностью объяснить код для этого без публикации дюжины или около того файлов, но конвейер динамически импортирует и запускает классы узлов, определенные в других файлах.
Вот пример узла, используемого для обработки данных
class fitICA(Node):
def __init__(self, name, params):
super(fitICA, self).__init__(name, params)
# Convert parameters set to None from text to Nonetype
if self.parameters["skew"] == "None":
self.parameters["skew"] = None
if self.parameters["kurt"] == "None":
self.parameters["kurt"] = None
if self.parameters["var"] == "None":
self.parameters["var"] = None
def process(self):
data = self.args["Raw/Epoch"]
randomState = None
if self.parameters["randomState"] == True:
randomState = 1
ica = mne.preprocessing.run_ica(data,
n_components=None,
max_pca_components=None,
random_state=randomState,
start=self.parameters["start"],
stop=self.parameters["stop"],
ecg_ch=None, # Need to replace with autofill data taken from channel list
skew_criterion=self.parameters["skew"],
kurt_criterion=self.parameters["kurt"],
var_criterion=self.parameters["var"],
method=self.parameters["method"])
return {"ICA Solution" : ica}
, и другой, который создает нужные нам графики
class plotSources(Node):
def __init__(self, name, params):
super(plotSources, self).__init__(name, params)
if self.parameters["saveGraph"] is not None:
assert(self.parameters["saveGraph"] is not ""), "ERROR: Plot Sources node set to save but no filename has been given. Please update the node settings and re-run"
def process(self):
inst = self.args["Data"]
ica = self.args["ICA Solution"]
fig = ica.plot_sources(inst, show = False)
if self.parameters["saveGraph"] is not None:
if "globalSaveStart" in self.parameters.keys():
f = self.parameters["globalSaveStart"] + self.global_vars["Output Filename"] + self.parameters["globalSaveEnd"]
else:
f = self.parameters["saveGraph"]
type = f.split(".")[-1]
if type == "png":
fig.savefig(f, format = "png")
elif type == "pdf":
fig.savefig(f, format = "pdf")
elif type == "pkl":
pickle.dump(fig, open(f, "wb"))
if self.parameters["showGraph"] == True:
fig.show()
else:
plt.close(fig)
Этим узлам может потребоваться несколько минут для завершения их вычисления поэтому обновление пользовательского интерфейса между вызовами узлов и его зависание во время их работы не является жизнеспособным вариантом для меня. threading
библиотеки, выберите графики во временный каталог (специально для графиков), а затем, как только конвейер завершит свою работу, эти файлы загрузятся, отобразятся графики и затем будут удалены временные файлы.
Если кто-то имел предложение, которое позволило бы мне обойти сохранение / загрузку, тогда я хотел бы услышать это