Я использую PyQt для написания программного обеспечения для обработки потока данных, которое включает в себя редактор узлов (на основе проекта qtpynodeeditor). Каждый узел начнет вычисление сразу после получения доступа ко всем данным, проверки правильности данных и передачи результатов следующему узлу.
Чтобы сделать работу узла неблокируемой и вернуть значение правильно, я использовал многопроцессорный модуль в Python.
multiprocessing.freeze_support()
CryptoComputeThreadPool = Pool(8)
...
class DataComputeModel(NodeDataModel):
caption_visible = True
properties = None
port_caption_visible = True
data_type = StringData.data_type
computeEndedSig = pyqtSignal()
def __init__(self, module, *args, **kwargs):
super().__init__(*args, **kwargs)
self.computeEndedSig.connect(self._emit_results)
self._statusLabel = uni_Widget.ICTFELabel()
self._statusLabel.setText('?')
self._statusLabel.setMinimumWidth(20)
self._statusLabel.setAlignment(
QtCore.Qt.AlignCenter | QtCore.Qt.AlignVCenter)
@property
def caption(self):
return self.name
def embedded_widget(self):
return self._statusLabel
def out_data(self, port: int) -> NodeData:
'''
The output data as a result of this calculation
Parameters
----------
port : int
Returns
-------
value : NodeData
'''
try:
return copy(self.outputs[port])
except:
return None
def set_in_data(self, data: NodeData, port: Port):
'''
New data at the input of the node
Parameters
----------
data : NodeData
port_index : int
'''
self.inputs[port.index] = copy(data)
if self._check_inputs():
try:
self.compute()
except Exception as e:
traceback.print_exc()
else:
self._statusLabel.setText('×')
for i in self.outputs:
self.outputs[i] = None
for i in range(self.num_ports[PortType.output]):
self.data_updated.emit(i)
def _emit_results(self):
for i in range(self.num_ports[PortType.output]):
self.data_updated.emit(i)
def _check_inputs(self):
try:
for i in self.inputs:
if self.inputs[i].data_type != StringData.data_type:
return False
return True
except:
return False
def compute(self):
self._statusLabel.setText('...')
inp = {}
print(self.inputs[0].string)
for i in self.inputs:
try:
inp[i] = self.inputs[i].string.decode()
except:
if self.inputs[i].string is not None:
inp[i] = str(self.inputs[i].string)
else:
inp[i] = None
CryptoComputeThreadPool.apply_async(
self.func, args=(inp, self.settings), callback=self._compute_callback, error_callback=self._compute_error_callback)
def _compute_error_callback(self, error=None, *args, **kwargs):
print(error)
for i in self.outputs:
self.outputs[i] = None
for i in range(self.num_ports[PortType.output]):
self.data_updated.emit(i)
self._statusLabel.setText('×')
def _compute_callback(self, out):
out = copy(out)
for i in out:
self.outputs[i] = StringData(out[i])
self.computeEndedSig.emit()
self._statusLabel.setText('√')
Но будет проблема с этим. Если функция калькуляции является очень трудоемкой операцией, то я изменяю ввод в процессе калькуляции, он немедленно запустит другой процесс калькуляции, и исходный процесс калькуляции не будет завершен, но продолжит выполнение, пока процесс калькуляции не закончится.
Я хочу найти способ управления процессом, который может записывать каждый процесс в узле. Если узел хочет запустить другой процесс, он должен сначала завершить процесс, который в данный момент выполняется (если есть) на узле. Что мне делать?