aiohttp - различное поведение при запуске процесса обработки из представления и запуска - PullRequest
0 голосов
/ 26 ноября 2018

Мне нужно выполнить некоторую блокирующую работу процессора, поэтому я запускаю новый AioProcess и использую AiPipe для обработки данных.

запуск процесса выглядит следующим образом:

async def launch_worker(app):
    pipe_a, pipe_b = AioPipe()
    process = AioProcess(target=worker, args=(pipe_b,))
    process.start()
    app.process, app.pipe = process, pipe_a
    await sleep(0)

Когда я запускаю рабочий при запуске, все работает нормально.Однако, если я запускаю работника из представления, я получаю странное поведение.В частности, когда я пытаюсь завершить работу сервера с помощью ctrl-c KeyboardInterrupt, сервер зависает - он останавливает обработку новых запросов, но не завершает работу, пока я не выполню уничтожение ОС.

Вот полный сценарий:

from aiohttp import web
import logging
from logging.config import dictConfig
from aioprocessing import AioPipe, AioProcess
from asyncio import shield, sleep, get_event_loop

import uuid
import time


async def the_view(request):
    msg = str(uuid.uuid4())[0:8]
    if request.app.process is None:
        logging.info("Launching worker process in view.")
        await launch_worker(request.app)

    pipe = request.app.pipe
    logging.info("Webserver Sending {}".format(msg))
    pipe.send(msg)
    output = await shield(pipe.coro_recv())
    logging.info("Webserver received {}".format(output))
    return web.Response(text=output)


def worker(pipe):
    try:
        while True:
            foo = pipe.recv()
            if foo is None:
                break
            else:
                time.sleep(3)
                msg = str(uuid.uuid4())[0:8]
                logging.info("Worker returning {} {}".format(foo, msg))
                pipe.send("{} {}".format(foo, msg))
    except KeyboardInterrupt:
        pipe.send("Worker killed, argggghhhh")


async def launch_worker_at_start(app):
    logging.info("Launching worker at start")
    return await launch_worker(app)


async def launch_worker(app):
    pipe_a, pipe_b = AioPipe()
    process = AioProcess(target=worker, args=(pipe_b,))
    process.start()
    app.process, app.pipe = process, pipe_a
    await sleep(0)


def build_app(loop):
    app = web.Application(debug=True)
    app.process, app.pipe = None, None
    app.router.add_get('/', the_view)

    ## uncomment the following line and everything works ##
    # loop.create_task(launch_worker_at_start(app))  
    #######################################################

    return app


if __name__ == '__main__':
    logging.info("Webserver starting")
    loop = get_event_loop()
    this_app = build_app(loop)

    web.run_app(this_app, port=8081, loop=loop)

Я использую aiohttp == 2.3.10 и aioprocessing == 0.0.1

EDIT:

То же поведение с aiohttp == 3.4.4 и aioprocessing ==1.0.1 и python 3.5.6.

...