Слушатель APScheduler, не работает asyncio.ensure_future - PullRequest
0 голосов
/ 25 января 2019

Скажите, где я не прав? Я создаю TornadoScheduler в контексте Tornado сервера. Назначьте слушателя и функцию выполнения. Но я получаю не совсем то, что мне нужно.

from tornado.concurrent import return_future, run_on_executor

class Users:
    @return_future
    @run_in_executor
     def save(self, callback=None):
         some code
         callback(some data)

scheduler.add_listener(_scheduler_listener, apscheduler.events.EVENT_ALL)

......

async def processing(event: JobEvent):
    data = await Users.get_all_users()  <-- comunicate with DB
    if isinstance(data.result(), Exception):
        raise data.result()
    .....
    done = await users.save() <-- comunicate with DB
    .....

def _scheduler_listener(event: JobEvent):
    asyncio.ensure_future(processing(event))

Планировщик запускает задачу, и все 3 события с кодом 512 - добавлено, 32768 - отправлено и 4096 - выполнено, поступают в слушатель, но оно asyncio.ensure_future() выполняется только 2 раза, когда 512 и 32768 кодируют. Я использую Tornado 5, TornadoScheduler. Когда я создаю задание, планировщик отправляет слушателю код 512, затем код 32768, а затем код 4096. Но когда приходит код 4096, asyncio.ensure_future не выполняется. Когда я нажимаю Crtl+C, отображается консоль:

RuntimeWarning: coroutine 'processing' was never awaited
2019-01-30 19:04:24,473 - asyncio - ERROR - Task was destroyed but it is pending! task: <Task pending coro=<processing() running at ....

В чем причина?

1 Ответ

0 голосов
/ 01 февраля 2019

Для запуска асинхронных функций вы должны использовать AsyncIOScheduler и объявить ваш вызываемый объект как (собственную) функцию сопрограммы.

...