Python, вызвать пул процессов, не блокируя цикл событий - PullRequest
0 голосов
/ 14 ноября 2018

Если я запускаю следующий код:

import asyncio
import time
import concurrent.futures

def cpu_bound(mul):
    for i in range(mul*10**8):
        i+=1
    print('result = ', i)
    return i

async def say_after(delay, what):
    print('sleeping async...')
    await asyncio.sleep(delay)
    print(what)

# The run_in_pool function must not block the event loop
async def run_in_pool():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        result = executor.map(cpu_bound, [1, 1, 1])

async def main():
    task1 = asyncio.create_task(say_after(0.1, 'hello'))
    task2 = asyncio.create_task(run_in_pool())
    task3 = asyncio.create_task(say_after(0.1, 'world'))

    print(f"started at {time.strftime('%X')}")
    await task1
    await task2
    await task3
    print(f"finished at {time.strftime('%X')}")

if __name__ == '__main__':
    asyncio.run(main())

Вывод:

started at 18:19:28
sleeping async...
result =  100000000
result =  100000000
result =  100000000
sleeping async...
hello
world
finished at 18:19:34

Это показывает, что цикл событий блокируется до завершения заданий, связанных с процессором (task2)и затем оно продолжается с task3.

Если я запускаю только одно задание, связанное с процессором (run_in_pool - следующее):

async def run_in_pool():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        result = await loop.run_in_executor(executor, cpu_bound, 1)

Тогда кажется, что событиецикл не блокируется, так как вывод:

started at 18:16:23
sleeping async...
sleeping async...
hello
world
result =  100000000
finished at 18:16:28

Как я могу запустить много связанных с процессором заданий (в task2) в пуле процессов, не блокируя цикл событий?

1 Ответ

0 голосов
/ 14 ноября 2018

Как вы обнаружили, вам нужно использовать собственный run_in_executor от asyncio, чтобы дождаться завершения отправленных задач, не блокируя цикл обработки событий. Asyncio не предоставляет эквивалент map, но его нетрудно эмулировать:

async def run_in_pool():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [loop.run_in_executor(executor, cpu_bound, i)
                   for i in (1, 1, 1)]
        result = await asyncio.gather(*futures)
...