Я пытаюсь выяснить, насколько эффективно создать решение с двумя пулами процессов (Flask экземпляры и рабочие) в следующем коде:
import argparse
import uuid
import sqlite3
from flask import Flask, request, jsonify, make_response
from multiprocessing import Queue, Process, Lock, Manager
from ServerUtils import StandaloneApplication
from queue import Empty
app = Flask(__name__)
authorization_tokens = ["dummy_token"]
jobs = []
def worker(request_queue, type, manager_dict):
worker_instance = Worker()
conn = sqlite3.connect("main.db")
c = conn.cursor()
while True:
if not request_queue.empty():
try:
text, request_id = request_queue.get_nowait()
except Empty:
continue
try:
result = worker_instance.work(text)
c.execute("INSERT INTO parsed VALUES(?, ?)", (request_id, result))
conn.commit()
except:
c.execute("INSERT INTO parsed VALUES(?, ?)", (request_id, "ERROR"))
conn.commit()
@app.route("/", methods=["POST"])
def parse():
conn = sqlite3.connect("main.db")
c = conn.cursor()
token = request.headers["Authorization"] if "Authorization" in request.headers else ""
if token in authorization_tokens:
body = request.get_json()
request_id = str(uuid.uuid4())
queues[body["Type"]].put((body["Text"], request_id))
answer = None
while True:
try:
answer = c.execute("SELECT * FROM parsed WHERE id_ LIKE ?", (request_id,)).fetchall()
if not answer:
continue
else:
break
except:
import pdb; pdb.set_trace()
break
response = { "body" : { "Parsed text" : answer[0] } }
return make_response(jsonify(response), 200)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--workers", type=int)
parser.add_argument("--types", nargs="+", type=str)
args = parser.parse_args()
queues = { type: Queue() for type in args.types }
for num, type in zip([i for i in range(args.workers)], args.types):
jobs.append(Process(target=worker,
args=(queues[type],
type,
manager_dict)))
[job.start() for job in jobs]
options = {
'bind': '%s:%s' % ('127.0.0.1', '5000'),
'workers': args.workers + 2,
'timeout': 300,
}
StandaloneApplication(app, options).run()
Я выполняю этот код с помощью следующей команды :
python main.py --workers 4
Основная идея состоит в том, что у меня есть 4 процесса, которые одновременно обрабатывают отдельные запросы к серверу, но они используют один пул рабочих, поэтому в основном:
- Запрос идет к серверу ("/").
- Сервер выбирает с помощью gunicorn magi c процесс, который его обработает.
Этот процесс помещает текст в объект многопроцессорной обработки. Вопрос, который будет проанализирован:
queues[body["Type"]].put((body["Text"], request_id))
Один из рабочих получает эту задачу из очереди: text, request_id = request_queue.get_nowait()
- Результат от работника отправляется в базу данных (экземпляр multiprocessing.Manager.dict был совершенно неэффективным с точки зрения затрат времени).
- Затем процесс Flask получает результат и отправляет его клиенту.
Проблема в том, что многопроцессорная обработка в этом сценарии все еще медленнее, чем при использовании только одного рабочего:
python main.py --workers 1
Почему?