Как совместить Python Asyncio и многопроцессорность? - PullRequest
0 голосов
/ 09 июля 2019

У меня есть устройство, которое нуждается в многопроцессорной обработке для обработки десериализации и декодирования входящих данных с помощью процессора;но остальная часть приложения - более медленный код с ограниченным вводом-выводом, что отлично подходит для asyncio.Тем не менее, похоже, что нет хорошего способа объединить многопроцессорность и асинхронность.

Я пробовал https://github.com/dano/aioprocessing,, который использует многопоточные операции с потоками.Однако эта библиотека изначально не поддерживает обычные асинхронные операции;например, отмена подпрограммы ожидания на queue.get с этой библиотекой приведет к тупику.

Я также пытался использовать ProcessPoolExecutor, но передача многопроцессорных объектов этому исполнителю не работает, так какобъекты очереди не передаются при создании процесса.

import multiprocessing
import asyncio
import atexit
from concurrent.futures import ProcessPoolExecutor


@atexit.register
def kill_children():
    [p.kill() for p in multiprocessing.active_children()]


async def queue_get(queue: multiprocessing.Queue):
    executor = ProcessPoolExecutor(max_workers=1)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(executor, queue.get)


async def main():
    queue = multiprocessing.Queue()
    get_task = asyncio.create_task(queue_get(queue))

    queue.put(None)

    print(await get_task)


if __name__ == "__main__":
    asyncio.run(main())

Запуск этого кода приводит к следующему исключению:

RuntimeError: Queue objects should only be shared between processes through inheritance

Есть ли способчисто ликвидировать разрыв между многопроцессорностью и асинхронностью?

1 Ответ

0 голосов
/ 09 июля 2019

Per Могу ли я каким-либо образом разделить асинхронную очередь с подпроцессом?

Приведенный выше код можно изменить для работы с многопроцессорной очередью, создав очередь через multiprocessing.Manager()

import multiprocessing
import asyncio
import atexit
from concurrent.futures import ProcessPoolExecutor


@atexit.register
def kill_children():
    [p.kill() for p in multiprocessing.active_children()]


async def queue_get(queue: multiprocessing.Queue):
    executor = ProcessPoolExecutor(max_workers=1)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(executor, queue.get)


async def main():
    manager = multiprocessing.Manager()
    queue = manager.Queue()
    get_task = asyncio.create_task(queue_get(queue))

    queue.put(None)
    print(await get_task)


if __name__ == "__main__":
    asyncio.run(main())
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...