Здесь есть несколько проблем:
Вы создаете новый цикл событий при импорте, один раз , но закрываете цикл событий в своем представлении. Нет необходимости вообще закрывать цикл, потому что запрос second теперь не будет выполнен, поскольку цикл закрыт.
Цикл события asyncio равен не потокобезопасен и не должен быть разделен между потоками. Подавляющее большинство развертываний Flask будут использовать потоки для обработки входящих запросов. Ваш код несет эхо того, как это должно быть обработано вместо этого, но, к сожалению, это не правильный подход. Например, asyncio.get_child_watcher().attach_loop(eventLoop)
в основном избыточен, поскольку eventLoop = asyncio.new_event_loop()
, если он запускается в главном потоке, уже делает именно это.
Это основной кандидат на проблемы, с которыми вы сталкиваетесь.
Ваш код предполагает, что исполняемый файл действительно присутствует и является исполняемым. Вы должны обрабатывать OSError
исключения (и подклассы), потому что неквалифицированный s.py
будет работать только в том случае, если он выполнен исполняемым, начинается с строки #!
shebang и находится в PATH
. Он не будет работать только потому, что он находится в том же каталоге, и в любом случае вы не захотите полагаться на текущий рабочий каталог.
В вашем коде предполагается, что процесс закрывает стандартный выводв какой-то момент . Если подпроцесс никогда не закрывает стандартный вывод (что происходит автоматически при выходе из процесса), тогда ваш цикл async for line in process.stdout:
также будет ждать вечно. Попробуйте добавить в код таймауты, чтобы избежать блокировки в неисправном подпроцессе.
В документации Python asyncio есть два раздела, которые вы действительно хотели бы прочитать при использовании подпроцессов asyncio в мультипроцессоре. многопоточное приложение:
Секция Параллелизм и многопоточность , объясняющая, что Почти все асинхронные объекты не являются поточно-ориентированными ,Вы не хотите добавлять задачи в цикл непосредственно из других потоков;вы хотите использовать цикл обработки событий для каждого потока или использовать функцию asyncio.run_coroutine_threadsafe()
для запуска сопрограммы в цикле в определенном потоке.
для Pythonверсии до 3.7, вам также необходимо прочитать секцию Подпроцесс и Потоки секцию , поскольку до этой версии asyncio
использует неблокирующий вызов os.waitpid(-1, os.WNOHANG)
для отслеживания дочернего состояния иполагается на использование обработки сигналов (что может быть сделано только в основном потоке). Python 3.8 снял это ограничение (добавив новую реализацию дочернего наблюдателя , которая использует блокировку для каждого процесса os.waitpid()
, в отдельном потоке за счет дополнительной памяти.
Однако имеют , чтобы придерживаться стратегии дочернего наблюдателя по умолчанию. Вы можете использовать EventLoopPolicy.set_child_watcher()
и передавать другой экземпляр наблюдателя процесса . На практике этоозначает обратную передачу 3.8 ThreadedChildWatcher
реализации .
В вашем случае нет необходимости запускать новый цикл обработки событий для каждого потока. цикл, в отдельном потоке по мере необходимости. Если вы используете цикл в отдельном потоке, в зависимости от версии Python, вам может потребоваться запустить цикл в основном потоке , а также или использовать другой процессwatcher. Вообще говоря, запуск асинхронного цикла в главном потоке на сервере WSGI не будет легким или даже невозможным.
Так что вам нужно постоянно запускать цикл в отдельном файле. объявление, и вам нужно использовать дочерний наблюдатель процесса, который работает без цикла основного потока. Вот реализация для этого, и она должна работать для Python версий 3.6 и новее:
import asyncio
import itertools
import logging
import time
import threading
try:
# Python 3.8 or newer has a suitable process watcher
asyncio.ThreadedChildWatcher
except AttributeError:
# backport the Python 3.8 threaded child watcher
import os
import warnings
# Python 3.7 preferred API
_get_running_loop = getattr(asyncio, "get_running_loop", asyncio.get_event_loop)
class _Py38ThreadedChildWatcher(asyncio.AbstractChildWatcher):
def __init__(self):
self._pid_counter = itertools.count(0)
self._threads = {}
def is_active(self):
return True
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def __del__(self, _warn=warnings.warn):
threads = [t for t in list(self._threads.values()) if t.is_alive()]
if threads:
_warn(
f"{self.__class__} has registered but not finished child processes",
ResourceWarning,
source=self,
)
def add_child_handler(self, pid, callback, *args):
loop = _get_running_loop()
thread = threading.Thread(
target=self._do_waitpid,
name=f"waitpid-{next(self._pid_counter)}",
args=(loop, pid, callback, args),
daemon=True,
)
self._threads[pid] = thread
thread.start()
def remove_child_handler(self, pid):
# asyncio never calls remove_child_handler() !!!
# The method is no-op but is implemented because
# abstract base class requires it
return True
def attach_loop(self, loop):
pass
def _do_waitpid(self, loop, expected_pid, callback, args):
assert expected_pid > 0
try:
pid, status = os.waitpid(expected_pid, 0)
except ChildProcessError:
# The child process is already reaped
# (may happen if waitpid() is called elsewhere).
pid = expected_pid
returncode = 255
logger.warning(
"Unknown child process pid %d, will report returncode 255", pid
)
else:
if os.WIFSIGNALED(status):
returncode = -os.WTERMSIG(status)
elif os.WIFEXITED(status):
returncode = os.WEXITSTATUS(status)
else:
returncode = status
if loop.get_debug():
logger.debug(
"process %s exited with returncode %s", expected_pid, returncode
)
if loop.is_closed():
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
else:
loop.call_soon_threadsafe(callback, pid, returncode, *args)
self._threads.pop(expected_pid)
# add the watcher to the loop policy
asyncio.get_event_loop_policy().set_child_watcher(_Py38ThreadedChildWatcher())
__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]
logger = logging.getLogger(__name__)
class EventLoopThread(threading.Thread):
loop = None
_count = itertools.count(0)
def __init__(self):
name = f"{type(self).__name__}-{next(self._count)}"
super().__init__(name=name, daemon=True)
def __repr__(self):
loop, r, c, d = self.loop, False, True, False
if loop is not None:
r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
return (
f"<{type(self).__name__} {self.name} id={self.ident} "
f"running={r} closed={c} debug={d}>"
)
def run(self):
self.loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
try:
shutdown_asyncgens = loop.shutdown_asyncgens()
except AttributeError:
pass
else:
loop.run_until_complete(shutdown_asyncgens)
loop.close()
asyncio.set_event_loop(None)
def stop(self):
loop, self.loop = self.loop, None
if loop is None:
return
loop.call_soon_threadsafe(loop.stop)
self.join()
_lock = threading.Lock()
_loop_thread = None
def get_event_loop():
global _loop_thread
with _lock:
if _loop_thread is None:
_loop_thread = EventLoopThread()
_loop_thread.start()
return _loop_thread.loop
def stop_event_loop():
global _loop_thread
with _lock:
if _loop_thread is not None:
_loop_thread.stop()
_loop_thread = None
def run_coroutine(coro):
return asyncio.run_coroutine_threadsafe(coro, get_event_loop())
Выше приведено то же общее решение 'run async with Flask', которое я опубликовал для Python3 Asyncioвызов по маршруту Flask , но с добавлением обратного порта ThreadedChildWatcher
.
Затем можно использовать цикл, возвращенный из get_event_loop()
, для запуска дочерних процессов, вызвав run_coroutine_threadsafe()
:
import asyncio
import locale
import logging
logger = logging.getLogger(__name__)
def get_command_output(cmd, timeout=None):
encoding = locale.getpreferredencoding(False)
async def run_async():
try:
process = await asyncio.create_subprocess_exec(
cmd, stdout=asyncio.subprocess.PIPE)
except OSError:
logging.exception("Process %s could not be started", cmd)
return
async for line in process.stdout:
line = line.decode(encoding)
# TODO: actually do something with the data.
print(line, flush=True)
process.kill()
logging.debug("Process for %s exiting with %i", cmd, process.returncode)
return await process.wait()
future = run_coroutine(run_async())
result = None
try:
result = future.result(timeout)
except asyncio.TimeoutError:
logger.warn('The child process took too long, cancelling the task...')
future.cancel()
except Exception as exc:
logger.exception(f'The child process raised an exception')
return result
Обратите внимание, что указанная выше функция может занять в секундах максимальное время, в течение которого вы будете ждать завершения подпроцесса.