Объединение asyncio с несколькими рабочими ProcessPoolExecutor - PullRequest
0 голосов
/ 30 июня 2018

Можно ли взять блокирующую функцию, такую ​​как work, и запустить ее одновременно в ProcessPoolExecutor, в которой работает более одного работника?

import asyncio
from time import sleep, time
from concurrent.futures import ProcessPoolExecutor

num_jobs = 4
queue = asyncio.Queue()
executor = ProcessPoolExecutor(max_workers=num_jobs)
loop = asyncio.get_event_loop()

def work():
    sleep(1)

async def producer():
    for i in range(num_jobs):
        results = await loop.run_in_executor(executor, work)
        await queue.put(results)

async def consumer():
    completed = 0
    while completed < num_jobs:
        job = await queue.get()
        completed += 1

s = time()
loop.run_until_complete(asyncio.gather(producer(), consumer()))
print("duration", time() - s)

Выполнение вышеуказанного на машине с более чем 4 ядрами занимает ~ 4 секунды. Как бы вы написали producer так, чтобы приведенный выше пример занимал всего ~ 1 секунду?

Ответы [ 2 ]

0 голосов
/ 30 июня 2018

Проблема в producer. Вместо того, чтобы запускать задания в фоновом режиме, он ожидает завершения каждого задания, тем самым сериализуя их. Если переписать producer, чтобы он выглядел так (и оставить consumer без изменений), вы получите ожидаемую длительность 1 с:

async def producer():
    for i in range(num_jobs):
        fut = loop.run_in_executor(executor, work)
        fut.add_done_callback(lambda f: queue.put_nowait(f.result()))
0 голосов
/ 30 июня 2018

await loop.run_in_executor(executor, work) блокирует цикл до тех пор, пока не завершится work, в результате за один раз будет работать только одна функция.

Для одновременного запуска заданий вы можете использовать asyncio.as_completed:

async def producer():
    tasks = [loop.run_in_executor(executor, work) for _ in range(num_jobs)]
    for f in asyncio.as_completed(tasks, loop=loop):
        results = await f
        await queue.put(results)
...