Параллельный будущий опрос серии блокирующих вызовов - PullRequest
1 голос
/ 26 марта 2020

Я пытаюсь сгенерировать механизм опроса для длительной задачи в Python. Для этого я использую параллельное Future и опрос с .done(). Задача существует из множества итераций, которые сами по себе блокируются, и я заключил их в asyn c функцию. У меня нет доступа к коду блокирующих функций, так как я вызываю стороннее программное обеспечение. Это минимальный пример моего текущего подхода:

import asyncio
import time

async def blocking_iteration():
    time.sleep(1)

async def long_running():
    for i in range(5):
        print(f"sleeping {i}")
        await blocking_iteration()

async def poll_run():
    future = asyncio.ensure_future(long_running())
    while not future.done():
        print("before polling")
        await asyncio.sleep(0.05)
        print("polling")
    future.result()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(poll_run())
    loop.close()

Результат этого:

before polling
sleeping 0
sleeping 1
sleeping 2
sleeping 3
sleeping 4
polling

Исходя из моего текущего понимания механизма асинхронности в Python, я ожидал l oop, чтобы разблокировать после первого сна, вернуть управление в l oop, которое go вернулось бы в оператор poll_run await и запускало бы вторую итерацию функции long_running только после последующего опроса. Таким образом, желаемый результат выглядит примерно так:

before polling
sleeping 0
polling
before polling
sleeping 1
polling
before polling
sleeping 2
polling
before polling
sleeping 3
polling
before polling
sleeping 4
polling

Может ли это быть достигнуто каким-либо образом при текущем подходе или это возможно другим способом?

РЕДАКТИРОВАТЬ

Благодаря @drjackild удалось решить ее, изменив

async def blocking_iteration():
    time.sleep(1)

на

def blocking():
    time.sleep(1)

async def blocking_iteration():
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, blocking)

1 Ответ

3 голосов
/ 26 марта 2020

time - синхронная библиотека, блокирующая весь основной поток при выполнении. Если у вас есть такие блокирующие вызовы в вашей программе, вы можете избежать блокировки с помощью потоков или исполнителей пула процессов (вы можете прочитать об этом здесь ). Или измените blocking_iteration на asyncio.sleep вместо time.sleep

UPD. Просто чтобы прояснить, вот неблокирующая версия, которая использует loop.run_in_executor с исполнителем по умолчанию. Пожалуйста, обратите внимание, что blocking_iteration сейчас без async

import asyncio
import concurrent.futures
import time

def blocking_iteration():
    time.sleep(1)

async def long_running():
    loop = asyncio.get_event_loop()
    for i in range(5):
        print(f"sleeping {i}")
        await loop.run_in_executor(None, blocking_iteration)

async def poll_run():
    task = asyncio.create_task(long_running())
    while not task.done():
        print("before polling")
        await asyncio.sleep(0.05)
        print("polling")
    print(task.result())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(poll_run())
    loop.close()
...