Попытка понять, как Tasks работает с блокировкой вызовов - PullRequest
0 голосов
/ 22 декабря 2019

Я недавно начал изучать библиотеку asyncio с целью замены приложения с большим потоком на async.

Чтение asyncio documentantion Я наткнулся на пример, гдеcreate_task используется. Поскольку сейчас я застрял на python 3.6, я изменил вызов create_task на ensure_future, получив текущий код:

# Python 3.6
import asyncio
import time


async def say_after(delay, what):
    print(f"start {what}")  # Added for better vizualization of what is happening
    await asyncio.sleep(delay)
    print(what)


async def main():
    task1 = asyncio.ensure_future(
        say_after(1, 'hello'))

    task2 = asyncio.ensure_future(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())
    loop.close()

И с выводом:

started at 15:23:11
start hello
start world
hello
world
finished at 15:23:13

Из того, что я могу понять, цикл событий:

  • Сначала запускает задачу task1;
  • После нажатия asyncio.sleep он меняетконтекст ко второму task2
  • Когда первый сон закончился, он меняется на task1, и то же самое происходит с task2, когда его вызов сна завершен.

Со всемиТем не менее, одно из требований моего приложения заключается в том, что у нас есть несколько блокирующих вызовов, которые необходимо было бы преобразовать в сопрограмму. Я создал этот фиктивный код для тестирования функции run_in_executor с этой целью:

# Python 3.6
import asyncio
import time

from concurrent.futures import ThreadPoolExecutor


def normal_operations():
    print("start blocking")
    time.sleep(1)
    print("ended blocking")


async def async_operation():
    print("start non blocking")
    await asyncio.sleep(2)
    print("ended non blocking")


async def main():
    loop = asyncio.get_event_loop()

    print(f"started at {time.strftime('%X')}")

    with ThreadPoolExecutor() as pool:
        task1 = asyncio.ensure_future(
            loop.run_in_executor(pool, normal_operations)
        )

    task2 = asyncio.ensure_future(
        async_operation()
    )

    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())
    loop.close()

Я ожидал, что результат будет похож на первый пример, но когда я запустил этот кодвывод был:

started at 15:28:06
start blocking
ended blocking
start non blocking
ended non blocking
finished at 15:28:09

Две функции выполнялись по порядку, не так, как в первом примере, когда вызовы start print вызывались один перед другим.

Я не уверенчто я делаю не так, я предполагаю, что функция run_in_executor на самом деле не создает асинхронный вызов, или, может быть, я просто реализовала это неправильно, я не знаю.

1 Ответ

1 голос
/ 22 декабря 2019

Хорошо, я думаю, что получил ошибку.

Я ждал неблокирующую операцию за пределами ThreadPoolExecutor, а в __exit__ dunder вызывается функция shutdown с параметром wait=True, поэтому в основном исполнитель блокировал мой код.

Фиксированный код:

async def main():
    loop = asyncio.get_event_loop()

    print(f"started at {time.strftime('%X')}")

    pool = ThreadPoolExecutor()

    task1 = asyncio.ensure_future(loop.run_in_executor(pool, normal_operations))
    task2 = asyncio.ensure_future(async_operation())

    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

    pool.shutdown(wait=True)

С ожидаемым результатом:

started at 16:20:48
start non blocking
start blocking
ended blocking
ended non blocking
finished at 16:20:50
...