После прочтения многочисленных постов я все еще не смог решить мою проблему (им не нужно снова читать Pipe / Queue в бесконечном цикле), так что:
У меня есть процесс, который перераспределяет команды для подпроцессов.Каждый процесс имеет бесконечный цикл, который ожидает переданные команды, затем выполняет некоторые действия и возвращает результаты.
Я пробовал и Pipe (), и Queue ().Оба работают, когда я использую from multiprocessing.dummy import Process, Lock, Queue
.
Когда я переключаюсь на from multiprocessing import Process, Lock, Queue
, версия Queue
оказывается заблокированной на command, args = self.pipe.get()
, а Pipe
вызывает EOFError на res = agent['process'].master.recv()
Мне не удалось воспроизвести ошибку Pipe
в следующем коде, но Queue
ведет себя так же.
Я использую Python 3.6.6 в Windows.
main
from testing.filesTesting.processCombination import ProcessCombination
if __name__=='__main__':
pc = ProcessCombination()
print('sending comm1')
r = pc._send_command('comm1')
print('result:', r)
ProcessCombination
from testing.filesTesting.subProcess import SubProcess
MULTIPROCESSING = True
if MULTIPROCESSING:
from multiprocessing import Process, Lock, Pipe, Queue
else:
from multiprocessing.dummy import Process, Lock, Pipe, Queue
class ProcessCombination():
def __init__(self):
# init processes
self.processes = []
for processID in range(0,3):
processArgs = self._get_ProcessArgs(processID)
process = SubProcess(**processArgs)
process.start()
self.processes.append({'process':process,'id':processID})
def _get_ProcessArgs(self, processID: int) -> dict:
# prepare process communication
# master, slave = Pipe()
master = Queue()
slave = Queue()
# return as dict
return {'processID': processID,
'master': master,
'slave': slave}
def _send_command(self, name, args: dict=None, await_response: bool = True):
# send command to all processes
for process in self.processes:
# process['process'].master.send((name, args))
process['process'].master.put((name, args))
# return response if required
return [self._rcv(process) for process in self.processes] if await_response else []
def _rcv(self, process):
# res = process['process'].master.recv()
res = process['process'].pipe.get()
return res
SubProcess
MULTIPROCESSING = True
if MULTIPROCESSING:
from multiprocessing import Process, Lock, Queue
else:
from multiprocessing.dummy import Process, Lock, Queue
import time
class SubProcess(Process):
def __init__(self, processID: int, master, slave):
super(Process,self).__init__(daemon=True)
self.master = master
self.pipe = slave
self._processID = processID
def start(self) -> None:
super(Process,self).start()
# self.pipe.close() # commented out when using queue
# time consuming class initialization -> time.sleep
time.sleep(2)
def run(self):
'''
Wait for command from pipe, and then perform selected action
'''
# self.master.close() # commented out when using queue
while True:
# command, args = self.pipe.recv()
command, args = self.master.get()
if command == 'comm1':
# do action
# ...
# self.pipe.send(True)
self.pipe.put('comm1 result')
# ...
elif command == 'close':
# closing ...
break