При использовании asyncio.create_subprocess_exec
возвращается asyncio.subprocess.process
. Документация указывает на отсутствие методов типа poll
или is_alive
. Кажется, wait
или communicate
предлагают единственный способ увидеть, запущен ли процесс, но они блокируют вызовы, а в версии asyncio для связи нет опции тайм-аута.
Есть ли хороший способ проверить, жив ли подпроцесс asyncio в неблокирующем пути?
Лучшее, что я могу прийти для функции стиля is_alive
:
import asyncio
async def is_alive(proc):
try:
await asyncio.wait_for(proc.wait(), 0.001)
except asyncio.TimeoutError:
return True
else:
return False
Пустой случай использования:
async def foo():
proc = await asyncio.create_subprocess_exec('sleep', '5')
i = 0
res = True
while res:
res = await is_alive(proc)
print(f"[{i}] is_alive: {res}")
# ... do foo stuff while we wait ...
await asyncio.sleep(1)
i += 1
loop = asyncio.get_event_loop()
loop.run_until_complete(foo())
Выход:
[0] is_alive: True
[1] is_alive: True
[2] is_alive: True
[3] is_alive: True
[4] is_alive: True
[5] is_alive: False