Подпроцесс Python3 Flask asyncio в маршруте зависает - PullRequest
1 голос
/ 24 октября 2019

Я использую Flask 1.0.2 с Python 3.6 в Ubuntu 18.04. Мое приложение должно использовать asyncio и asyncio.create_subprocess_exec(), чтобы запустить фоновый скрипт, прочитать стандартный вывод из него и затем вернуть статус, когда скрипт будет выполнен.

Я в основном пытаюсь реализовать ответ из этого поста: Неблокирующее чтение для подпроцесса. ТРУБА в python

Сценарий успешно запущен, и я получаювсе мои ожидаемые результаты от него, но проблема в том, что он никогда не возвращается (то есть строка Killing subprocess now никогда не достигается). Когда я проверяю список процессов (ps) из терминала Linux, фоновый скрипт завершается.

Что я делаю неправильно и как я могу успешно выйти из цикла async for line in process.stdout?

Вверху моего файла после импорта я создаю цикл событий:

# Create a loop to run all the tasks in.
global eventLoop ; asyncio.set_event_loop(None)
eventLoop = asyncio.new_event_loop()
asyncio.get_child_watcher().attach_loop(eventLoop)

Я определяю свою асинхронную сопрограмму выше моего маршрута:

async def readAsyncFunctionAndKill(cmd):
    # Use global event loop
    global eventLoop

    print("[%s] Starting async Training Script ..." % (os.path.basename(__file__)))
    process = await asyncio.create_subprocess_exec(cmd,stdout=PIPE,loop=eventLoop)
    print("[%s] Starting to read stdout ..." % (os.path.basename(__file__)))
    async for line in process.stdout:
        line = line.decode(locale.getpreferredencoding(False))
        print("%s"%line, flush=True)
    print("[%s] Killing subprocess now ..." % (os.path.basename(__file__)))
    process.kill()
    print("[%s] Training process return code was: %s" % (os.path.basename(__file__), process.returncode))
    return await process.wait()  # wait for the child process to exit

И мой (сокращенный) маршрут здесь:

@app.route("/train_model", methods=["GET"])
def train_new_model():
    # Use global event loop
    global eventLoop   

    with closing(eventLoop):        
        eventLoop.run_until_complete(readAsyncFunctionAndKill("s.py"))

    return jsonify("done"), 200

Сценарий "s.py" называетсяпомечен как исполняемый и находится в том же рабочем каталоге. Здесь показан сокращенный скрипт (он содержит несколько подпроцессов и создает экземпляры классов PyTorch):

def main():

    # Ensure that swap is activated since we don't have enough RAM to train our model otherwise
    print("[%s] Activating swap now ..." % (os.path.basename(__file__)))
    subprocess.call("swapon -a", shell=True)

    # Need to initialize GPU
    print("[%s] Initializing GPU ..." % (os.path.basename(__file__)))
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    defaults.device = torch.device("cuda")
    with torch.cuda.device(0):
        torch.tensor([1.]).cuda()

    print("[%s] Cuda is Available: %s - with Name: %s ..." % (os.path.basename(__file__),torch.cuda.is_available(),torch.cuda.get_device_name(0)))

    try:

        print("[%s] Beginning to train new model and replace existing model ..." % (os.path.basename(__file__)))


        # Batch size
        bs = 16
        #bs = 8

        # Create ImageBunch
        tfms = get_transforms(do_flip=True,
                              flip_vert=True,
                              max_rotate=180.,
                              max_zoom=1.1,
                              max_lighting=0.5,
                              max_warp=0.1,
                              p_affine=0.75,
                              p_lighting=0.75)

        # Create databunch using folder names as class names
        # This also applies the transforms and batch size to the data
        os.chdir(TRAINING_DIR)
        data = ImageDataBunch.from_folder("TrainingData", ds_tfms=tfms, train='.', valid_pct=0.2, bs=bs)

        ...    

        # Create a new learner with an early stop callback
        learn = cnn_learner(data, models.resnet18, metrics=[accuracy], callback_fns=[
            partial(EarlyStoppingCallback, monitor='accuracy', min_delta=0.01, patience=3)])

        ... 

        print("[%s] All done training ..." % (os.path.basename(__file__)))

        # Success
        sys.exit(0)

    except Exception as err:

        print("[%s] Error training model [ %s ] ..." % (os.path.basename(__file__),err))
        sys.exit(255)

if __name__== "__main__":
  main()

1 Ответ

5 голосов
/ 29 октября 2019

Здесь есть несколько проблем:

  • Вы создаете новый цикл событий при импорте, один раз , но закрываете цикл событий в своем представлении. Нет необходимости вообще закрывать цикл, потому что запрос second теперь не будет выполнен, поскольку цикл закрыт.

  • Цикл события asyncio равен не потокобезопасен и не должен быть разделен между потоками. Подавляющее большинство развертываний Flask будут использовать потоки для обработки входящих запросов. Ваш код несет эхо того, как это должно быть обработано вместо этого, но, к сожалению, это не правильный подход. Например, asyncio.get_child_watcher().attach_loop(eventLoop) в основном избыточен, поскольку eventLoop = asyncio.new_event_loop(), если он запускается в главном потоке, уже делает именно это.

    Это основной кандидат на проблемы, с которыми вы сталкиваетесь.

  • Ваш код предполагает, что исполняемый файл действительно присутствует и является исполняемым. Вы должны обрабатывать OSError исключения (и подклассы), потому что неквалифицированный s.py будет работать только в том случае, если он выполнен исполняемым, начинается с строки #! shebang и находится в PATH. Он не будет работать только потому, что он находится в том же каталоге, и в любом случае вы не захотите полагаться на текущий рабочий каталог.

  • В вашем коде предполагается, что процесс закрывает стандартный выводв какой-то момент . Если подпроцесс никогда не закрывает стандартный вывод (что происходит автоматически при выходе из процесса), тогда ваш цикл async for line in process.stdout: также будет ждать вечно. Попробуйте добавить в код таймауты, чтобы избежать блокировки в неисправном подпроцессе.

В документации Python asyncio есть два раздела, которые вы действительно хотели бы прочитать при использовании подпроцессов asyncio в мультипроцессоре. многопоточное приложение:

  • Секция Параллелизм и многопоточность , объясняющая, что Почти все асинхронные объекты не являются поточно-ориентированными ,Вы не хотите добавлять задачи в цикл непосредственно из других потоков;вы хотите использовать цикл обработки событий для каждого потока или использовать функцию asyncio.run_coroutine_threadsafe() для запуска сопрограммы в цикле в определенном потоке.

  • для Pythonверсии до 3.7, вам также необходимо прочитать секцию Подпроцесс и Потоки секцию , поскольку до этой версии asyncio использует неблокирующий вызов os.waitpid(-1, os.WNOHANG) для отслеживания дочернего состояния иполагается на использование обработки сигналов (что может быть сделано только в основном потоке). Python 3.8 снял это ограничение (добавив новую реализацию дочернего наблюдателя , которая использует блокировку для каждого процесса os.waitpid(), в отдельном потоке за счет дополнительной памяти.

    Однако имеют , чтобы придерживаться стратегии дочернего наблюдателя по умолчанию. Вы можете использовать EventLoopPolicy.set_child_watcher() и передавать другой экземпляр наблюдателя процесса . На практике этоозначает обратную передачу 3.8 ThreadedChildWatcher реализации .

В вашем случае нет необходимости запускать новый цикл обработки событий для каждого потока. цикл, в отдельном потоке по мере необходимости. Если вы используете цикл в отдельном потоке, в зависимости от версии Python, вам может потребоваться запустить цикл в основном потоке , а также или использовать другой процессwatcher. Вообще говоря, запуск асинхронного цикла в главном потоке на сервере WSGI не будет легким или даже невозможным.

Так что вам нужно постоянно запускать цикл в отдельном файле. объявление, и вам нужно использовать дочерний наблюдатель процесса, который работает без цикла основного потока. Вот реализация для этого, и она должна работать для Python версий 3.6 и новее:

import asyncio
import itertools
import logging
import time
import threading

try:
    # Python 3.8 or newer has a suitable process watcher
    asyncio.ThreadedChildWatcher
except AttributeError:
    # backport the Python 3.8 threaded child watcher
    import os
    import warnings

    # Python 3.7 preferred API
    _get_running_loop = getattr(asyncio, "get_running_loop", asyncio.get_event_loop)

    class _Py38ThreadedChildWatcher(asyncio.AbstractChildWatcher):
        def __init__(self):
            self._pid_counter = itertools.count(0)
            self._threads = {}

        def is_active(self):
            return True

        def close(self):
            pass

        def __enter__(self):
            return self

        def __exit__(self, exc_type, exc_val, exc_tb):
            pass

        def __del__(self, _warn=warnings.warn):
            threads = [t for t in list(self._threads.values()) if t.is_alive()]
            if threads:
                _warn(
                    f"{self.__class__} has registered but not finished child processes",
                    ResourceWarning,
                    source=self,
                )

        def add_child_handler(self, pid, callback, *args):
            loop = _get_running_loop()
            thread = threading.Thread(
                target=self._do_waitpid,
                name=f"waitpid-{next(self._pid_counter)}",
                args=(loop, pid, callback, args),
                daemon=True,
            )
            self._threads[pid] = thread
            thread.start()

        def remove_child_handler(self, pid):
            # asyncio never calls remove_child_handler() !!!
            # The method is no-op but is implemented because
            # abstract base class requires it
            return True

        def attach_loop(self, loop):
            pass

        def _do_waitpid(self, loop, expected_pid, callback, args):
            assert expected_pid > 0

            try:
                pid, status = os.waitpid(expected_pid, 0)
            except ChildProcessError:
                # The child process is already reaped
                # (may happen if waitpid() is called elsewhere).
                pid = expected_pid
                returncode = 255
                logger.warning(
                    "Unknown child process pid %d, will report returncode 255", pid
                )
            else:
                if os.WIFSIGNALED(status):
                    returncode = -os.WTERMSIG(status)
                elif os.WIFEXITED(status):
                    returncode = os.WEXITSTATUS(status)
                else:
                    returncode = status

                if loop.get_debug():
                    logger.debug(
                        "process %s exited with returncode %s", expected_pid, returncode
                    )

            if loop.is_closed():
                logger.warning("Loop %r that handles pid %r is closed", loop, pid)
            else:
                loop.call_soon_threadsafe(callback, pid, returncode, *args)

            self._threads.pop(expected_pid)

    # add the watcher to the loop policy
    asyncio.get_event_loop_policy().set_child_watcher(_Py38ThreadedChildWatcher())

__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]

logger = logging.getLogger(__name__)

class EventLoopThread(threading.Thread):
    loop = None
    _count = itertools.count(0)

    def __init__(self):
        name = f"{type(self).__name__}-{next(self._count)}"
        super().__init__(name=name, daemon=True)

    def __repr__(self):
        loop, r, c, d = self.loop, False, True, False
        if loop is not None:
            r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
        return (
            f"<{type(self).__name__} {self.name} id={self.ident} "
            f"running={r} closed={c} debug={d}>"
        )

    def run(self):
        self.loop = loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        try:
            loop.run_forever()
        finally:
            try:
                shutdown_asyncgens = loop.shutdown_asyncgens()
            except AttributeError:
                pass
            else:
                loop.run_until_complete(shutdown_asyncgens)
            loop.close()
            asyncio.set_event_loop(None)

    def stop(self):
        loop, self.loop = self.loop, None
        if loop is None:
            return
        loop.call_soon_threadsafe(loop.stop)
        self.join()

_lock = threading.Lock()
_loop_thread = None

def get_event_loop():
    global _loop_thread
    with _lock:
        if _loop_thread is None:
            _loop_thread = EventLoopThread()
            _loop_thread.start()
        return _loop_thread.loop

def stop_event_loop():
    global _loop_thread
    with _lock:
        if _loop_thread is not None:
            _loop_thread.stop()
            _loop_thread = None

def run_coroutine(coro):
    return asyncio.run_coroutine_threadsafe(coro, get_event_loop())

Выше приведено то же общее решение 'run async with Flask', которое я опубликовал для Python3 Asyncioвызов по маршруту Flask , но с добавлением обратного порта ThreadedChildWatcher.

Затем можно использовать цикл, возвращенный из get_event_loop(), для запуска дочерних процессов, вызвав run_coroutine_threadsafe():

import asyncio
import locale
import logging

logger = logging.getLogger(__name__)


def get_command_output(cmd, timeout=None):
    encoding = locale.getpreferredencoding(False)

    async def run_async():
        try:
            process = await asyncio.create_subprocess_exec(
                cmd, stdout=asyncio.subprocess.PIPE)
        except OSError:
            logging.exception("Process %s could not be started", cmd)
            return

        async for line in process.stdout:
            line = line.decode(encoding)
            # TODO: actually do something with the data.
            print(line, flush=True)

        process.kill()
        logging.debug("Process for %s exiting with %i", cmd, process.returncode)

        return await process.wait()

    future = run_coroutine(run_async())
    result = None
    try:
        result = future.result(timeout)
    except asyncio.TimeoutError:
        logger.warn('The child process took too long, cancelling the task...')
        future.cancel()
    except Exception as exc:
        logger.exception(f'The child process raised an exception')
    return result

Обратите внимание, что указанная выше функция может занять в секундах максимальное время, в течение которого вы будете ждать завершения подпроцесса.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...