Я застрял на некоторых сейчас и хотел спросить, может быть, у кого-то была похожая проблема.
Мне нужно запустить внешний исполняемый файл при запуске моего приложения, он должен быть живым все время и умирать при выходе из приложения. Но поскольку загрузка занимает целую вечность, я хотел записать вывод в режиме реального времени и ждать сообщения статистики.
Теперь интересная часть:
async def start_app(event):
command_raw = 'ping 127.0.0.1'
command = shlex.split(command_raw, posix="win" not in sys.platform)
create = asyncio.create_subprocess_exec(*command, stdout=asyncio.subprocess.PIPE)
proc = await create
while True:
data = await proc.stdout.readline()
line = data.decode('ascii').rstrip()
if line == 'Ping statistics for 127.0.0.1:':
break
event.set()
def runner_worker(runner_loop):
asyncio.set_event_loop(runner_loop)
runner_loop.run_forever()
if sys.platform == "win32":
runner_loop = asyncio.ProactorEventLoop()
event = Event()
t = Thread(target=runner_worker, args=(runner_loop,))
t.start()
f = functools.partial(start_app, event)
runner_loop.call_soon_threadsafe(f)
print('waiting for externall app to start')
event.wait(timeout=60)
print('reasuming application')
time.sleep(2)
print('ending...')
Как видите, я хотел, чтобы заблокированный цикл работал в другом потоке (который всегда будет заблокирован), который устанавливает флаг запуска приложения.
Здесь я даже не уверен, что это правильный подход.
Даже пропуская ошибку, которую я получил:
RuntimeWarning: coroutine 'start_app' was never awaited
self._callback(*self._args)
Сегодня я попробовал другой подход:
import asyncio
import concurrent.futures
import logging
import sys
import time
import shlex
import subprocess
def start_app_syn():
log = logging.getLogger('ext application')
log.info('starting application')
command_raw = 'ping -t 127.0.0.1'
command = shlex.split(command_raw, posix="win" not in sys.platform)
proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
while True:
line = proc.stdout.readline().rstrip()
if line == b'Reply from 127.0.0.1: bytes=32 time<1ms TTL=128':
break
return proc
async def run_blocking_tasks(executor):
log = logging.getLogger('run_blocking_tasks')
log.info('starting')
log.info('creating executor ')
loop = asyncio.get_event_loop()
log.info('waiting for application start')
runner = loop.run_in_executor(executor, start_app_syn, )
try:
proc = await asyncio.wait_for(runner, timeout=60)
log.info('application has started')
except TimeoutError as e:
log.error('Process didnot started in acceptable time')
raise e
return proc
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr,
)
executor = concurrent.futures.ThreadPoolExecutor()
event_loop = asyncio.get_event_loop()
task = asyncio.ensure_future(run_blocking_tasks(executor), loop=event_loop)
proc = event_loop.run_until_complete(task)
print('App started')
print('doing a lot of stuff')
for i in range(5):
print('... ')
time.sleep(1)
print('Exiting...')
proc.kill()
event_loop.close()
Это работает. Но я не могу судить об этом хорошем подходе.