Многопроцессное управление в Python с многопроцессорным управлением - PullRequest
1 голос
/ 10 июля 2020

У меня проблема с многопроцессорностью в Python. Мне нужно создать асинхронные c процессы, которые запускаются неопределенное время, и количество процессов также не определено. Как только поступает новый запрос, необходимо создать новый процесс со спецификациями из запроса. Мы используем ZeroMQ для обмена сообщениями. Существует также процесс, который запускается в начале и завершается только при завершении всего сценария.

Теперь я ищу решение, как я могу ожидать все процессы, имея при этом возможность добавлять дополнительные процессы.

asyncio.gather()

Это была моя первая идея, но ему нужен список процессов до его вызова.

class Object:
  def __init__(self, var):
     self.var = var

  async def run(self):
      *do async things*

class object_controller:
  
  def __init__(self):
     self.ctx = zmq.Context()
     self.socket = self.ctx.socket(zmq.PULL)
     self.socket.connect("tcp://127.0.0.1:5558")

     self.static_process = AStaticProcess()
     self.sp = aiomultiprocess.Process(target=self.static_process.run)
     self.sp.start()
     #here I need a good way to await this process


  def process(self, var):
    object = Object(var)
    process = aiomultiprocess.Process(target=object.run)
    process.start()
  
  def listener(self)
    while True:
      msg = self.socket.recv_pyobj()
      # here I need to find a way how I can start and await this process while beeing able to 
      # receive additional request, which result in additional processes which need to be awaited

Это некоторый код, который, надеюсь, объясняет мою проблему. Мне нужен своего рода коллектор, который ожидает процессы.

После инициализации нет взаимодействия между объектом и контроллером, только через zeroMQ (между процессом stati c и переменными процессами). Возврата тоже нет.

Ответы [ 2 ]

2 голосов
/ 10 июля 2020

Если вам нужно запустить процессы, одновременно ожидая новых, вместо явного вызова await, чтобы узнать, когда процессы завершаются sh, позвольте им выполняться в фоновом режиме, используя asyncio.create_task() . Это вернет объект Task, который имеет метод add_done_callback, который вы можете использовать для выполнения некоторой работы, когда процесс завершится:

class Object:
  def __init__(self, var):
     self.var = var

  async def run(self):
      *do async things*

class object_controller:
  
  def __init__(self):
     self.ctx = zmq.Context()
     self.socket = self.ctx.socket(zmq.PULL)
     self.socket.connect("tcp://127.0.0.1:5558")

     self.static_process = AStaticProcess()
     self.sp = aiomultiprocess.Process(target=self.static_process.run)
     self.sp.start()
     asyncio.create_task(self.sp.join() self.handle_proc_finished)


  def process(self, var):
    object = Object(var)
    process = aiomultiprocess.Process(target=object.run)
    process.start()
  
  def listener(self)
    while True:
      msg = self.socket.recv_pyobj()
      process = aiomultiprocess.Process(...)
      process.start()
      t = asyncio.create_task(process.join())
      t.add_done_callback(self.handle_other_proc_finished)

  def handle_proc_finished(self, task):
     # do something

  def handle_other_proc_finished(self, task):
    # do something else

Если вы не хотите использовать обратные вызовы, вы также можете передать create_task сопрограмму, которую вы определяете самостоятельно, которая ожидает завершения процесса sh и делает все, что нужно сделать после этого.

self.sp.start()
asyncio.create_task(wait_for_proc(self.sp))

async def wait_for_proc(proc):
   await proc.join()
   # do other stuff
0 голосов
/ 10 июля 2020

Вам необходимо создать список задач или будущий объект для процессов. Также вы не можете добавить процесс к событию l oop в ожидании других задач

...