У меня есть устройство, которое нуждается в многопроцессорной обработке для обработки десериализации и декодирования входящих данных с помощью процессора;но остальная часть приложения - более медленный код с ограниченным вводом-выводом, что отлично подходит для asyncio.Тем не менее, похоже, что нет хорошего способа объединить многопроцессорность и асинхронность.
Я пробовал https://github.com/dano/aioprocessing,, который использует многопоточные операции с потоками.Однако эта библиотека изначально не поддерживает обычные асинхронные операции;например, отмена подпрограммы ожидания на queue.get
с этой библиотекой приведет к тупику.
Я также пытался использовать ProcessPoolExecutor
, но передача многопроцессорных объектов этому исполнителю не работает, так какобъекты очереди не передаются при создании процесса.
import multiprocessing
import asyncio
import atexit
from concurrent.futures import ProcessPoolExecutor
@atexit.register
def kill_children():
[p.kill() for p in multiprocessing.active_children()]
async def queue_get(queue: multiprocessing.Queue):
executor = ProcessPoolExecutor(max_workers=1)
loop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, queue.get)
async def main():
queue = multiprocessing.Queue()
get_task = asyncio.create_task(queue_get(queue))
queue.put(None)
print(await get_task)
if __name__ == "__main__":
asyncio.run(main())
Запуск этого кода приводит к следующему исключению:
RuntimeError: Queue objects should only be shared between processes through inheritance
Есть ли способчисто ликвидировать разрыв между многопроцессорностью и асинхронностью?