Я нашел решение для своего собственного вопроса, когда писал его, и, поскольку рекомендуется ответить на ваш собственный вопрос о переполнении стека , я бы хотел поделиться этим с кем-то еще с той же проблемой.
Решением этой проблемы является обработка приложения flask как другого процесса вместо потока. Это достигается с помощью Process
из multiprocessing
модуля вместо Thread
из threading
модуля.
Я пришел к такому выводу после прочтения этого ответа переполнения стека относительно остановки flask без использования CTRL + C. После прочтения этого ответа я узнаю о различиях между multiprocessing
и threading
в этом ответе переполнения стека . Конечно, после этого я перешел к официальной документации по multiprocessing
модулю, , найденному здесь . Более конкретно, эта ссылка приведет вас прямо к классу Process
.
Я не могу полностью сформулировать , почему модуль multiprocessing
служит этому цель лучше, чем threading
, но я чувствую, что это имеет больше смысла для этого приложения. В конце концов, приложение flask действует как собственный сервер API, который отделен от моего теста, и мой тест проверяет вызовы к нему / ответы, которые он получает. По этой причине, я думаю, что для моего flask приложения имеет смысл иметь собственный процесс.
tl; dr
Использовать multiprocessing.Process
вместо threading.Thread
, и затем вызовите Process.terminate()
, чтобы завершить процесс, а затем Process.join()
, чтобы заблокировать, пока процесс не будет завершен.
пример:
import requests
import unittest
from multiprocessing import Process
class MyTest(unittest.TestCase):
def setUp(self):
test_port = 8000
self.test_url = f"http://0.0.0.0:{str(test_port)}"
self.app_process = Process(target=app.run, kwargs={"host": "0.0.0.0", "port": test_port, "debug": False})
self.app_process.start()
def test_a_test_that_contacts_the_server(self):
response = requests.post(
f"{self.test_url}/dosomething",
json={"foo": "bar"},
headers=foo_bar
)
is_successful = json.loads(response.text)["isSuccessful"]
self.assertTrue(is_successful, msg=json.loads(response.text)["message"])
def tearDown(self):
self.app_process.terminate()
self.app_process.join()
Проверяйте рано и часто проверяйте!