Многопроцессорная обработка Python с асинхронными функциями - PullRequest
0 голосов
/ 10 марта 2019

Я построил сервер веб-сокетов, упрощенная версия которого показана ниже:

import websockets, subprocess, asyncio, json, re, os, sys
from multiprocessing import Process

def docker_command(command_words):
    return subprocess.Popen(
        ["docker"] + command_words,
        stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)

async def check_submission(websocket:object, submission:dict):
    exercise=submission["exercise"]
    with docker_command(["exec", "-w", "badkan", "grade_exercise", exercise]) as proc:
        for line in proc.stdout:
            print("> " + line)
            await websocket.send(line)

async def run(websocket, path):
    submission_json = await websocket.recv()   # returns a string
    submission = json.loads(submission_json)   # converts the string to a python dict

    ####
    await check_submission(websocket, submission)


websocketserver = websockets.server.serve(run, '0.0.0.0', 8888, origins=None)
asyncio.get_event_loop().run_until_complete(websocketserver)
asyncio.get_event_loop().run_forever()

Работает нормально, когда одновременно работает только один пользователь. Но когда несколько пользователей пытаются использовать сервер, сервер обрабатывает их последовательно, поэтому последующим пользователям приходится долго ждать.

Я попытался преобразовать его в многопроцессорный сервер, заменив строку, помеченную "####" ("await check_submission ..."), на:

p = Process(target=check_submission, args=(websocket, submission,))
p.start()

Но это не сработало - я получил предупреждение во время выполнения: "coroutine: 'check_submission' никогда не ожидалось", и я не увидел никаких выходных данных через веб-сокет.

Я также пытался заменить эти строки на:

loop = asyncio.get_event_loop()
loop.set_default_executor(ProcessPoolExecutor())
await loop.run_in_executor(None, check_submission, websocket, submission)

но получена другая ошибка: "не могу засечь asyncio.Future objects".

Как мне создать этот многопроцессорный сервер веб-сокетов?

1 Ответ

1 голос
/ 10 марта 2019

Проблема в том, что subprocess.Popen не является асинхронным, поэтому check_submission блокирует цикл обработки событий в ожидании следующей строки вывода Docker.

Вам вообще не нужно использовать многопроцессорность;поскольку вы блокируете во время ожидания подпроцесса, вам просто нужно переключиться с subprocess на asyncio.subprocess:

async def docker_command(command_words):
    return await asyncio.subprocess.create_subprocess_exec(
        *["docker"] + command_words,
        stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)

async def check_submission(websocket:object, submission:dict):
    exercise = submission["exercise"]
    proc = await docker_command(["exec", "-w", "badkan", "grade_exercise", exercise])
    async for line in proc.stdout:
        print(b"> " + line)
        await websocket.send(line)
    await proc.wait()
...