Есть ли способ сделать асинхронное получение из многопроцессорной очереди без блокировки? - PullRequest
0 голосов
/ 09 июля 2019

У меня есть устройство, которое нуждается в многопроцессорной обработке для десериализации и декодирования с привязкой к процессору, но в остальной части приложения очень медленный ввод-вывод и работа в сети, что созрело для asyncio;но кажется, что нет хорошего способа объединить эти два.Есть ли способ сделать асинхронное получение из многопроцессорной очереди без блокировки?

Я пытался https://github.com/dano/aioprocessing, но он зависает при отмене сопрограммы.

import multiprocessing
import asyncio
import atexit
from concurrent.futures import ProcessPoolExecutor


@atexit.register
def kill_children():
    [p.kill() for p in multiprocessing.active_children()]


async def queue_get(queue: multiprocessing.Queue):
    executor = ProcessPoolExecutor(max_workers=1)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(executor, queue.get)


async def main():
    queue = multiprocessing.Queue()
    get_task = asyncio.create_task(queue_get(queue))

    queue.put(None)

    print(await get_task)


if __name__ == "__main__":
    asyncio.run(main())

Это неработать, потому что очередь не передается процессу до его создания.

...