Краткое описание проблемы
У меня есть сервер фляг Python, на котором одна из конечных точек выполняет умеренный объем работы (реальный код читает, изменяет размеры и возвращает изображение).Я хочу оптимизировать конечную точку, чтобы она могла вызываться несколько раз параллельно.
Код, который у меня есть (показанный ниже), не работает, поскольку он основывается на передаче объекта multiprocessing.Event
через multiprocessing.JoinableQueue
, что недопустимо и приводит к следующей ошибке:
RuntimeError: Condition objects should only be shared between processes
through inheritance
Как я могу использовать отдельный процесс для вычисления некоторых заданий и уведомления основного потока о завершении конкретного задания?
Подтверждение концепции
Flask может быть многопоточным, поэтому еслиодин запрос ожидает результата, другие потоки могут продолжать обрабатывать другие запросы.У меня есть базовое подтверждение концепции, которая показывает, что параллельные запросы могут быть оптимизированы с помощью многопроцессорной обработки: https://github.com/alanbacon/flask_multiprocessing
Пример кода на github порождает новый процесс для каждого запроса, который, как я понимаю, имеет значительные накладные расходы, плюс я 'Я заметил, что мой сервер проверки концепции падает, если имеется более 10 или 20 одновременных запросов, я подозреваю, что это происходит из-за слишком большого количества процессов, вызываемых.
Текущая попытка
Я пытался создать набор работников, которые выбирают рабочие места из очереди.Когда работа завершена, результат записывается в общую область памяти.Каждое задание содержит работу, которую необходимо выполнить, и объект Event
, который можно установить после завершения задания, чтобы сигнализировать основному потоку.
Каждый поток запроса передает задание с вновь созданным объектом Event
.Затем он немедленно ожидает этого события, а затем возвращает результат.Пока один поток запросов к серверу ожидает, сервер может использовать другие потоки для продолжения обслуживания других запросов.
Проблема, как упоминалось выше, заключается в том, что Event
объекты не могут быть переданы таким способом.
Какой подход я должен предпринять, чтобы обойти эту проблему?
from flask import Flask, request, Response,
import multiprocessing
import uuid
app = Flask(__name__)
# flask config
app.config['PROPAGATE_EXCEPTIONS'] = True
app.config['DEBUG'] = False
def simpleWorker(complexity):
temp = 0
for i in range(0, complexity):
temp += 1
mgr = multiprocessing.Manager()
results = mgr.dict()
joinableQueue = multiprocessing.JoinableQueue()
lock = multiprocessing.Lock()
def mpWorker(joinableQueue, lock, results):
while True:
next_task = joinableQueue.get() # blocking call
if next_task is None: # poison pill to kill worker
break
simpleWorker(next_task['complexity']) # pretend to do heavy work
result = next_task['val'] * 2 # compute result
ID = next_task['ID']
with lock:
results[ID] = result # output result to shared memory
next_task['event'].set() # tell main process result is calculated
joinableQueue.task_done() # remove task from queue
@app.route("/work/<ID>", methods=['GET'])
def work(ID=None):
if request.method == 'GET':
# send a task to the consumer and wait for it to finish
uid = str(uuid.uuid4())
event = multiprocessing.Event()
# pass event to job so that job can tell this thread when processing is
# complete
joinableQueue.put({
'val': ID,
'ID': uid,
'event': event,
'complexity': 100000000
})
event.wait() # wait for result to be calculated
# get result from shared memory area, and clean up
with lock:
result = results[ID]
del results[ID]
return Response(str(result), 200)
if __name__ == "__main__":
num_consumers = multiprocessing.cpu_count() * 2
consumers = [
multiprocessing.Process(
target=mpWorker,
args=(joinableQueue, lock, results))
for i in range(num_consumers)
]
for c in consumers:
c.start()
host = '127.0.0.1'
port = 8080
app.run(host=host, port=port, threaded=True)