Как перезапустить сервер flask через API, когда он использует многопроцессорность внутри - PullRequest
1 голос
/ 03 марта 2020

У меня есть приложение flask с официанткой, которое получает некоторые данные в пост-запросе, затем выполняет несколько длинных вычислений в long_function и возвращает результат. Эти вычисления параллельны, и я использую pebble, потому что мне нужна опция тайм-аута. Также я хочу, чтобы пользователь мог отправить запрос на перезапуск сервера (т.е. он хочет изменить количество потоков для waitress)

Я нашел это решение https://gist.github.com/naushadzaman/b65534d912f1551c7d8366b326b7a151 В основном это работает, но плохо взаимодействует с моим pebble пулом. У меня проблемы с перезагрузкой сервера, пока он находится в пуле. Если я использую long_function_without_pool, который не использует многопроцессорность, я могу перезагрузить сервер, даже если он в данный момент выполняет какую-то работу (результаты, конечно, теряются, но это то, что я хочу). Но с long_function мне нужно дождаться закрытия пула и только тогда я могу перезапустить сервер. Если я пытаюсь отправить запрос на перезапуск, пока пул еще открыт, я получаю сообщение об ошибке:

OSError: [Errno 98] Address already in use

Так что я полагаю, что p.terminate() не работает, если есть Pool работает.

Как я могу исправить этот код, или, может быть, я должен использовать другое решение?

Краткие инструкции о том, как повторить эту ошибку:

  1. запустить приложение

  2. отправить POST-запрос с пустым телом на http://localhost: 5221 /

  3. прежде чем получить ответ (у вас будет 5 секунд) отправьте GET-запрос без переменных на http://localhost: 5221 / restart /

  4. наслаждайтесь. Сервер завис и не отвечает ни на что

    import subprocess
    from flask import Flask
    from flask_restful import Api, Resource
    from flask_cors import CORS
    from webargs.flaskparser import parser, abort
    import json
    import time
    import sys
    from waitress import serve
    from multiprocessing import Process, Queue
    from concurrent.futures import TimeoutError
    from pebble import ProcessPool, ProcessExpired
    import functools
    
    some_queue = None
    
    
    APP = Flask(__name__)
    API = Api(APP)
    CORS(APP)
    
    @APP.route('/restart/', methods=['GET'], endpoint='start_flaskapp')
    def restart():
        try:
            some_queue.put("something")
            print("Restarted successfully")
            return("Quit")
        except:
            print("Failed in restart")
            return "Failed"
    
    def start_flaskapp(queue):
        global some_queue
        some_queue = queue
        API.add_resource(FractionsResource, "/")
        serve(APP, host='0.0.0.0', port=5221, threads=2)
    
    def long_function():
        with ProcessPool(5) as pool:
            data = [0, 1, 2, 3, 4]
            future = pool.map(functools.partial(add_const, const=1), data, timeout=5)
            iterator = future.result()
            result=[]
            while True:
                try:
                    result.append(next(iterator))
                except StopIteration:
                    break
                except TimeoutError as error:
                    print("function took longer than %d seconds" % error.args[1])
        return(result)
    
    def long_function_without_pool():
        data = [0, 1, 2, 3, 4]
        result = list(map(functools.partial(add_const, const=1), data))
        return(result)
    
    def add_const(number, const=0):
        time.sleep(5)
        return number+const
    
    class FractionsResource(Resource):
        @APP.route('/', methods=['POST'])
        def post():
            response = long_function()
            return(json.dumps(response))
    
    if __name__ == "__main__":
    
        q = Queue()
        p = Process(target=start_flaskapp, args=(q,))
        p.start()
        while True: #wathing queue, if there is no call than sleep, otherwise break
            if q.empty():
                time.sleep(1)
            else:
                break
        p.terminate() #terminate flaskapp and then restart the app on subprocess
        args = [sys.executable] + [sys.argv[0]]
        subprocess.call(args)
    
...