У меня есть приложение flask с официанткой, которое получает некоторые данные в пост-запросе, затем выполняет несколько длинных вычислений в long_function
и возвращает результат. Эти вычисления параллельны, и я использую pebble
, потому что мне нужна опция тайм-аута. Также я хочу, чтобы пользователь мог отправить запрос на перезапуск сервера (т.е. он хочет изменить количество потоков для waitress
)
Я нашел это решение https://gist.github.com/naushadzaman/b65534d912f1551c7d8366b326b7a151 В основном это работает, но плохо взаимодействует с моим pebble
пулом. У меня проблемы с перезагрузкой сервера, пока он находится в пуле. Если я использую long_function_without_pool
, который не использует многопроцессорность, я могу перезагрузить сервер, даже если он в данный момент выполняет какую-то работу (результаты, конечно, теряются, но это то, что я хочу). Но с long_function
мне нужно дождаться закрытия пула и только тогда я могу перезапустить сервер. Если я пытаюсь отправить запрос на перезапуск, пока пул еще открыт, я получаю сообщение об ошибке:
OSError: [Errno 98] Address already in use
Так что я полагаю, что p.terminate()
не работает, если есть Pool
работает.
Как я могу исправить этот код, или, может быть, я должен использовать другое решение?
Краткие инструкции о том, как повторить эту ошибку:
-
запустить приложение
отправить POST-запрос с пустым телом на http://localhost: 5221 /
прежде чем получить ответ (у вас будет 5 секунд) отправьте GET-запрос без переменных на http://localhost: 5221 / restart /
наслаждайтесь. Сервер завис и не отвечает ни на что
import subprocess
from flask import Flask
from flask_restful import Api, Resource
from flask_cors import CORS
from webargs.flaskparser import parser, abort
import json
import time
import sys
from waitress import serve
from multiprocessing import Process, Queue
from concurrent.futures import TimeoutError
from pebble import ProcessPool, ProcessExpired
import functools
some_queue = None
APP = Flask(__name__)
API = Api(APP)
CORS(APP)
@APP.route('/restart/', methods=['GET'], endpoint='start_flaskapp')
def restart():
try:
some_queue.put("something")
print("Restarted successfully")
return("Quit")
except:
print("Failed in restart")
return "Failed"
def start_flaskapp(queue):
global some_queue
some_queue = queue
API.add_resource(FractionsResource, "/")
serve(APP, host='0.0.0.0', port=5221, threads=2)
def long_function():
with ProcessPool(5) as pool:
data = [0, 1, 2, 3, 4]
future = pool.map(functools.partial(add_const, const=1), data, timeout=5)
iterator = future.result()
result=[]
while True:
try:
result.append(next(iterator))
except StopIteration:
break
except TimeoutError as error:
print("function took longer than %d seconds" % error.args[1])
return(result)
def long_function_without_pool():
data = [0, 1, 2, 3, 4]
result = list(map(functools.partial(add_const, const=1), data))
return(result)
def add_const(number, const=0):
time.sleep(5)
return number+const
class FractionsResource(Resource):
@APP.route('/', methods=['POST'])
def post():
response = long_function()
return(json.dumps(response))
if __name__ == "__main__":
q = Queue()
p = Process(target=start_flaskapp, args=(q,))
p.start()
while True: #wathing queue, if there is no call than sleep, otherwise break
if q.empty():
time.sleep(1)
else:
break
p.terminate() #terminate flaskapp and then restart the app on subprocess
args = [sys.executable] + [sys.argv[0]]
subprocess.call(args)