Многопроцессорная обработка Python: завершение задания сигнала без передачи объекта Event через очередь - PullRequest
0 голосов
/ 18 октября 2018

Краткое описание проблемы

У меня есть сервер фляг Python, на котором одна из конечных точек выполняет умеренный объем работы (реальный код читает, изменяет размеры и возвращает изображение).Я хочу оптимизировать конечную точку, чтобы она могла вызываться несколько раз параллельно.

Код, который у меня есть (показанный ниже), не работает, поскольку он основывается на передаче объекта multiprocessing.Event через multiprocessing.JoinableQueue, что недопустимо и приводит к следующей ошибке:

RuntimeError: Condition objects should only be shared between processes 
through inheritance

Как я могу использовать отдельный процесс для вычисления некоторых заданий и уведомления основного потока о завершении конкретного задания?


Подтверждение концепции

Flask может быть многопоточным, поэтому еслиодин запрос ожидает результата, другие потоки могут продолжать обрабатывать другие запросы.У меня есть базовое подтверждение концепции, которая показывает, что параллельные запросы могут быть оптимизированы с помощью многопроцессорной обработки: https://github.com/alanbacon/flask_multiprocessing

Пример кода на github порождает новый процесс для каждого запроса, который, как я понимаю, имеет значительные накладные расходы, плюс я 'Я заметил, что мой сервер проверки концепции падает, если имеется более 10 или 20 одновременных запросов, я подозреваю, что это происходит из-за слишком большого количества процессов, вызываемых.


Текущая попытка

Я пытался создать набор работников, которые выбирают рабочие места из очереди.Когда работа завершена, результат записывается в общую область памяти.Каждое задание содержит работу, которую необходимо выполнить, и объект Event, который можно установить после завершения задания, чтобы сигнализировать основному потоку.

Каждый поток запроса передает задание с вновь созданным объектом Event.Затем он немедленно ожидает этого события, а затем возвращает результат.Пока один поток запросов к серверу ожидает, сервер может использовать другие потоки для продолжения обслуживания других запросов.

Проблема, как упоминалось выше, заключается в том, что Event объекты не могут быть переданы таким способом.

Какой подход я должен предпринять, чтобы обойти эту проблему?

from flask import Flask, request, Response,
import multiprocessing
import uuid

app = Flask(__name__)

# flask config
app.config['PROPAGATE_EXCEPTIONS'] = True
app.config['DEBUG'] = False


def simpleWorker(complexity):
    temp = 0
    for i in range(0, complexity):
        temp += 1


mgr = multiprocessing.Manager()
results = mgr.dict()
joinableQueue = multiprocessing.JoinableQueue()
lock = multiprocessing.Lock()

def mpWorker(joinableQueue, lock, results):
    while True:
        next_task = joinableQueue.get()        # blocking call
        if next_task is None:                  # poison pill to kill worker
            break
        simpleWorker(next_task['complexity'])  # pretend to do heavy work
        result = next_task['val'] * 2          # compute result
        ID = next_task['ID']
        with lock:
            results[ID] = result               # output result to shared memory
        next_task['event'].set()               # tell main process result is calculated
        joinableQueue.task_done()              # remove task from queue


@app.route("/work/<ID>", methods=['GET'])
def work(ID=None):
    if request.method == 'GET':

        # send a task to the consumer and wait for it to finish
        uid = str(uuid.uuid4())
        event = multiprocessing.Event()

        # pass event to job so that job can tell this thread when processing is 
        # complete
        joinableQueue.put({
            'val': ID,
            'ID': uid,
            'event': event,
            'complexity': 100000000
        })

        event.wait()  # wait for result to be calculated

        # get result from shared memory area, and clean up
        with lock:
            result = results[ID]
            del results[ID]

        return Response(str(result), 200)


if __name__ == "__main__":
    num_consumers = multiprocessing.cpu_count() * 2
    consumers = [
        multiprocessing.Process(
            target=mpWorker,
            args=(joinableQueue, lock, results))
        for i in range(num_consumers)
    ]
    for c in consumers:
        c.start()

    host = '127.0.0.1'
    port = 8080
    app.run(host=host, port=port, threaded=True)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...