Синхронизация многопроцессных очередей с asyncio - PullRequest
0 голосов
/ 18 июня 2019

Я хочу собрать данные из циклов asyncio, запущенных в одноуровневых процессах с Python 3.7

В идеале я бы использовал multiprocess.JoinableQueue, передавая его join() вызов для синхронизации.

Однако его примитивы синхронизации полностью блокируют цикл обработки событий (см. Мой частичный ответ ниже для примера).

Иллюстративный прототип:

class MP_GatherDict(dict):
    '''A per-process dictionary which can be gathered from a single one'''
    def __init__(self):
        self.q = multiprocess.JoinableQueue()
        super().__init__()

    async def worker_process_server(self):
        while True:
            (await?) self.q.put(dict(self)) # Put a shallow copy
            (await?) self.q.join() # Wait for it to be gathered

    async def gather(self):
        all_dicts = []
        while not self.q.empty():
            all_dicts.append(await self.q.get())
            self.q.task_done()
        return all_dicts

Обратите внимание, чтоПоток put->get->join->put может работать не так, как ожидалось, но на самом деле этот вопрос касается использования multiprocess примитивов в asyncio цикле событий ...

Тогда возникает вопрос как лучше await дляmultiprocess примитивы из цикла событий asyncio?

1 Ответ

0 голосов
/ 18 июня 2019

Этот тест показывает, что multiprocess.Queue.get() блокирует весь цикл событий:

mp_q = mp.JoinableQueue()
async def mp_queue_wait():
    try:
        print('Queue:',mp_q.get(timeout=2))
    except Exception as ex:
        print('Queue:',repr(ex))

async def main_loop_task():
    task = asyncio.get_running_loop().create_task(mp_queue_wait())
    for i in range(3):
        print(i, os.times())
        await asyncio.sleep(1)
    await task
    print(repr(task))

asyncio.run(main_loop_task())

Чей вывод:

0 posix.times_result(user=0.41, system=0.04, children_user=0.0, children_system=0.0, elapsed=17208620.18)
Queue: Empty()
1 posix.times_result(user=0.41, system=0.04, children_user=0.0, children_system=0.0, elapsed=17208622.18)
2 posix.times_result(user=0.41, system=0.04, children_user=0.0, children_system=0.0, elapsed=17208623.18)
<Task finished coro=<mp_queue_wait() done,...> result=None>

Итак, я смотрю на asyncio.loop.run_in_executor () в качестве следующего возможного ответа, однако порождение исполнителя / потока только для этого кажется излишним ...

Вот тот же тест с использованием исполнителя по умолчанию:

async def mp_queue_wait():
    try:
        result = await asyncio.get_running_loop().run_in_executor(None,mp_q.get,True,2)
    except Exception as ex:
        result = ex
    print('Queue:',repr(result))
    return result 

И (желаемый) результат:

0 posix.times_result(user=0.36, system=0.02, children_user=0.0, children_system=0.0, elapsed=17210674.65)
1 posix.times_result(user=0.37, system=0.02, children_user=0.0, children_system=0.0, elapsed=17210675.65)
Queue: Empty()
2 posix.times_result(user=0.37, system=0.02, children_user=0.0, children_system=0.0, elapsed=17210676.66)
<Task finished coro=<mp_queue_wait() done, defined at /home/apozuelo/Documents/5G_SBA/Tera5G/services/db.py:211> result=Empty()>
...