Как вывести задачу из цикла событий asyncio в представлении? - PullRequest
1 голос
/ 30 июня 2019

В моем веб-приложении, написанном с использованием aiohttp, у меня есть 2 конечные точки. Сначала запускается асинхронная задача, которая представляет собой бесконечный цикл. Второй предназначен для решения определенной задачи. Поскольку асинхронная задача не имеет какой-либо концепции, я немного запутался. Можно ли сохранить какой-либо идентификатор задачи в базе данных? Это правильный способ сделать это или библиотека уже предоставила решение для такого рода проблем?

aiohttp_app / views.py

from aiohttp import web

import asyncio
import json


async def coro(frequency):
    while True:
         print('Infinite loop iteration')
         await asyncio.sleep(frequency)


def start_task(request):
    event_loop = asyncio.get_event_loop()
    task = event_loop.create_task(coro())
    # save some identifier of the task in the database to find it later
    response = dict()
    return web.json_response(json.dumps(response))


def stop_task(request):
     task = None  # here i must get a certain task outta event loop
     task.cancel()
     response = dict()
     return web.json_response(json.dumps(response))

Спасибо за любую помощь!

1 Ответ

0 голосов
/ 01 июля 2019

Вы можете сгенерировать простые монотонно увеличивающиеся числовые идентификаторы и иметь глобальный запрос, который отображает идентификатор на экземпляр задачи. Отображение будет удалено после завершения сопрограммы. Например (не проверено):

import asyncio, itertools

_next_id = itertools.count().__next__
_tasks = {}

def make_task(corofn, *coroargs):
    task_id = _next_id()
    async def wrapped_coro():
        try:
            return await corofn(*coroargs)
        finally:
            del _tasks[task_id]
    task = asyncio.create_task(wrapped_coro())
    _tasks[task_id] = task
    return task_id, task

def get_task(task_id):
    return _tasks[task_id]

Вы можете использовать его в start_task и stop_task:

def start_task(request):
    task_id, _ = make_task(coro)
    response = {'task_id': task_id}
    return web.json_response(json.dumps(response))

def stop_task(request):
     task_id = json.loads(await request.text())['task_id']
     task = get_task(task_id)
     task.cancel()
     response = {}
     return web.json_response(json.dumps(response))
...