Как правильно использовать run_in_executor? - PullRequest
0 голосов
/ 22 мая 2019

Я пытаюсь использовать run_in_executor и у меня есть несколько вопросов. Вот код (в основном копирование из документов)

import asyncio
import concurrent.futures


def cpu_bound(val):
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    print(f'Start task: {val}')
    sum(i * i for i in range(10 ** 7))
    print(f'End task: {val}')


async def async_task(val):
    print(f'Start async task: {val}')
    while True:
        print(f'Tick: {val}')
        await asyncio.sleep(1)


async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    for i in range(5):
        loop.create_task(async_task(i))

    # 1. Run in the default loop's executor:
    # for i in range(10):
    #     loop.run_in_executor(
    #         None, cpu_bound, i)
    # print('default thread pool')

    # 2. Run in a custom thread pool:
    # with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool:
    #     for i in range(10):
    #         loop.run_in_executor(
    #             pool, cpu_bound, i)
    #     print('custom thread pool')

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor(max_workers = 10) as pool:
        for i in range(10):
            loop.run_in_executor(
                pool, cpu_bound, i)
        print('custom process pool')

    while True:
        await asyncio.sleep(1)


asyncio.run(main())

Случай 1: run_in_executor, где executor равно None: async_task выполняется одновременно с cpu_bound.

В других случаях async_task будут выполняться после выполнения cpu_bound. Я думал, когда мы используем ProcessPoolExecutor задачи не должны блокировать цикл. Где я не прав?

1 Ответ

0 голосов
/ 22 мая 2019

В других случаях async_task будут выполняться после выполнения cpu_bound. Я думал, когда мы используем ProcessPoolExecutor задачи не должны блокировать цикл. Где я не прав?

Проблема в том, что with XXXPoolExecutor() закрывает пул в конце блока with. Завершение работы пула ожидает завершения отложенных задач, что блокирует цикл обработки событий и несовместимо с asyncio. Поскольку в вашем первом варианте нет оператора with, у него нет этой проблемы.

Решение состоит в том, чтобы просто удалить оператор with и создать пул один раз (например, на верхнем уровне или в main()), и просто использовать в функции. Если вы хотите, вы можете явно закрыть пул, вызвав pool.shutdown() после завершения asyncio.run().

Также обратите внимание, что вы никогда не ожидаете фьючерса, возвращенного loop.run_in_executor. Это ошибка, и Asyncio, вероятно, предупредит вас об этом; вам, вероятно, следует собрать возвращенные значения в списке и ждать их, например, results = await asyncio.gather(*tasks). Это не только соберет результаты, но и обеспечит правильное распространение исключений, возникающих в функциях вне потока, на ваш код, а не на их удаление.

...