Мне нужно выполнить некоторую блокирующую работу процессора, поэтому я запускаю новый AioProcess и использую AiPipe для обработки данных.
запуск процесса выглядит следующим образом:
async def launch_worker(app):
pipe_a, pipe_b = AioPipe()
process = AioProcess(target=worker, args=(pipe_b,))
process.start()
app.process, app.pipe = process, pipe_a
await sleep(0)
Когда я запускаю рабочий при запуске, все работает нормально.Однако, если я запускаю работника из представления, я получаю странное поведение.В частности, когда я пытаюсь завершить работу сервера с помощью ctrl-c KeyboardInterrupt, сервер зависает - он останавливает обработку новых запросов, но не завершает работу, пока я не выполню уничтожение ОС.
Вот полный сценарий:
from aiohttp import web
import logging
from logging.config import dictConfig
from aioprocessing import AioPipe, AioProcess
from asyncio import shield, sleep, get_event_loop
import uuid
import time
async def the_view(request):
msg = str(uuid.uuid4())[0:8]
if request.app.process is None:
logging.info("Launching worker process in view.")
await launch_worker(request.app)
pipe = request.app.pipe
logging.info("Webserver Sending {}".format(msg))
pipe.send(msg)
output = await shield(pipe.coro_recv())
logging.info("Webserver received {}".format(output))
return web.Response(text=output)
def worker(pipe):
try:
while True:
foo = pipe.recv()
if foo is None:
break
else:
time.sleep(3)
msg = str(uuid.uuid4())[0:8]
logging.info("Worker returning {} {}".format(foo, msg))
pipe.send("{} {}".format(foo, msg))
except KeyboardInterrupt:
pipe.send("Worker killed, argggghhhh")
async def launch_worker_at_start(app):
logging.info("Launching worker at start")
return await launch_worker(app)
async def launch_worker(app):
pipe_a, pipe_b = AioPipe()
process = AioProcess(target=worker, args=(pipe_b,))
process.start()
app.process, app.pipe = process, pipe_a
await sleep(0)
def build_app(loop):
app = web.Application(debug=True)
app.process, app.pipe = None, None
app.router.add_get('/', the_view)
## uncomment the following line and everything works ##
# loop.create_task(launch_worker_at_start(app))
#######################################################
return app
if __name__ == '__main__':
logging.info("Webserver starting")
loop = get_event_loop()
this_app = build_app(loop)
web.run_app(this_app, port=8081, loop=loop)
Я использую aiohttp == 2.3.10 и aioprocessing == 0.0.1
EDIT:
То же поведение с aiohttp == 3.4.4 и aioprocessing ==1.0.1 и python 3.5.6.