Я создаю API-интерфейс Flask MethodView . Для конкретной конечной точки я использую данные запроса для запуска потенциально длительной команды. Вместо того, чтобы ждать завершения команды, я обертываю ее в multiprocessing.Process
, вызываю start, а затем возвращаю пользователю HTTP 202 вместе с URL-адресом, который они могут использовать для мониторинга состояния процесса.
class EndPointAPI(MethodView):
def __init__(self):
""" On init, filter requests missing JSON body."""
# Check for json payload
self.except = ["GET", "PUT", "DELETE" ]
if (request.method not in self.except) and not request.json:
abort(400)
def _long_running_function(self, json_data):
"""
In this function, I use the input JSON data
to write a script to the file system, then
use subprocess.run to execute it.
"""
return
def post(self):
""" """
# Get input data
json_data = request.json
# Kick off the long running function
p = Process(target=long_running_function, args=(json_data,))
p.start()
response = {
"result" : "job accepted",
"links" : {
"href" : "/monitor_job/",
}
}
return jsonify(response), 202
Похоже, процессы, запущенные в методе post
, становятся зомби после их завершения, но я не могу понять, как правильно отслеживать и очищать их, не блокируя выполнениеродительского метода . Я попытался реализовать поток мониторинга, как это было предложено в . Присоединение Python к процессу без блокировки родительского элемента . Насколько я понимаю, он предлагает запустить отдельный поток, который отслеживает очередь FIFO, а затем поместить дескриптор процесса в очередь перед возвратом родительской функции. Я попробовал реализацию (ниже), но похоже, что вы не можете передать объект процесса в поток, поскольку он содержит защищенный атрибут AuthenticationString
.
Traceback (most recent call last):
| File "/opt/miniconda3/envs/m137p3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
| obj = _ForkingPickler.dumps(obj)
| File "/opt/miniconda3/envs/m137p3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
| cls(buf, protocol).dump(obj)
| File "/opt/miniconda3/envs/m137p3/lib/python3.6/multiprocessing/process.py", line 291, in __reduce__
| 'Pickling an AuthenticationString object is '
| TypeError: Pickling an AuthenticationString object is disallowed for security reasons
Это моя реализация Python присоединяется к процессу, не блокируя родителя . Я понятия не имею, будет ли это работать, потому что вышеупомянутая ошибка выключает всю систему с самого начала. Будем очень благодарны за любые мысли или предложения о том, как я могу ответственно запускать эти процессы, не блокируя вызывающий метод.
from threading import Thread
from multiprocessing import Queue, ...
class Joiner(Thread):
def __init__(self, q):
super().__init__()
self.__q = q
def run(self):
while True:
child = self.__q.get()
if child == None:
return
child.join()
class EndPointAPI(MethodView):
def __init__(self):
""" On init, filter requests missing JSON body."""
self._jobs = Queue()
self._babysitter = Joiner(self._jobs)
self._babysitter.start()
# Check for json payload
self.except = ["GET", "PUT", "DELETE" ]
if (request.method not in self.except) and not request.json:
abort(400)
def _long_running_function(self, json_data):
"""
In this function, I use the input JSON data
to write a script to the file system, then
use subprocess.run to execute it.
"""
return
def post(self):
""" """
# Get input data
json_data = request.json
# Kick off the long running function
p = Process(target=long_running_function, args=(json_data,))
p.start()
self._jobs.put(p)
response = {
"result" : "job accepted",
"links" : {
"href" : "/monitor_job/",
}
}
return jsonify(response), 202