Блокировка очереди в процессе Python - PullRequest
0 голосов
/ 28 сентября 2019

После прочтения многочисленных постов я все еще не смог решить мою проблему (им не нужно снова читать Pipe / Queue в бесконечном цикле), так что:

У меня есть процесс, который перераспределяет команды для подпроцессов.Каждый процесс имеет бесконечный цикл, который ожидает переданные команды, затем выполняет некоторые действия и возвращает результаты.

Я пробовал и Pipe (), и Queue ().Оба работают, когда я использую from multiprocessing.dummy import Process, Lock, Queue.

Когда я переключаюсь на from multiprocessing import Process, Lock, Queue, версия Queue оказывается заблокированной на command, args = self.pipe.get(), а Pipe вызывает EOFError на res = agent['process'].master.recv()

Мне не удалось воспроизвести ошибку Pipe в следующем коде, но Queue ведет себя так же.

Я использую Python 3.6.6 в Windows.

main

from testing.filesTesting.processCombination import ProcessCombination

if __name__=='__main__':
    pc = ProcessCombination()
    print('sending comm1')
    r = pc._send_command('comm1')
    print('result:', r)

ProcessCombination

from testing.filesTesting.subProcess import SubProcess

MULTIPROCESSING = True

if MULTIPROCESSING:
    from multiprocessing import Process, Lock, Pipe, Queue
else:
    from multiprocessing.dummy import Process, Lock, Pipe, Queue


class ProcessCombination():
    def __init__(self):
        # init processes
        self.processes = []
        for processID in range(0,3):    
            processArgs = self._get_ProcessArgs(processID)
            process = SubProcess(**processArgs)
            process.start()
            self.processes.append({'process':process,'id':processID})

    def _get_ProcessArgs(self, processID: int) -> dict:
        # prepare process communication
#        master, slave = Pipe()
        master = Queue()
        slave = Queue()
        # return as dict
        return {'processID': processID,
                'master': master,
                'slave': slave}


    def _send_command(self, name, args: dict=None, await_response: bool = True):
        # send command to all processes
        for process in self.processes:
#            process['process'].master.send((name, args))
            process['process'].master.put((name, args))
        # return response if required
        return [self._rcv(process) for process in self.processes] if await_response else []

    def _rcv(self, process):
#        res = process['process'].master.recv()
        res = process['process'].pipe.get()
        return res

SubProcess

MULTIPROCESSING = True

if MULTIPROCESSING:
    from multiprocessing import Process, Lock, Queue
else:
    from multiprocessing.dummy import Process, Lock, Queue

import time

class SubProcess(Process): 
    def __init__(self, processID: int, master, slave):       
        super(Process,self).__init__(daemon=True)
        self.master = master
        self.pipe = slave
        self._processID = processID


    def start(self) -> None:
        super(Process,self).start() 
#        self.pipe.close() # commented out when using queue
#        time consuming class initialization -> time.sleep
        time.sleep(2)


    def run(self):
        '''
        Wait for command from pipe, and then perform selected action
        '''
#        self.master.close() # commented out when using queue
        while True:
#            command, args = self.pipe.recv()
            command, args = self.master.get()

            if command == 'comm1':
                # do action
                # ...
#                self.pipe.send(True)
                self.pipe.put('comm1 result')
            # ...
            elif command == 'close':
                # closing ...
                break
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...