Планирование сопрограммы внутри не-сопрограммного метода - PullRequest
0 голосов
/ 17 апреля 2020

Я поддерживаю проект, в котором у меня есть случай, когда мне нужно запланировать сопрограмму внутри синхронной функции, которая уже выполняется внутри события l oop.

Моя проблема сводится к следующему:

import asyncio

class SomeScheduler:
    def __init__(self):
        self.workers = []

    # This is not a coroutine yet
    def close(self):
        for worker in self.workers:
            worker.close()

    def register_worker(self, worker):
        self.workers.append(worker)

    async def run(self):
        for _ in range(3):
            print("Doing stuff")
            coros = map(lambda x: x.work(), self.workers)
            await asyncio.gather(*coros)
            await asyncio.sleep(1)


class SomeWorkerA:
    def close(self):
        print("Closing WorkerA")

    async def work(self):
        print("Working WorkerA")
        await asyncio.sleep(0.2)
        print("Done WorkerA")

class SomeWorkerB:
    def close(self):
        print("Closing WorkerB")

    async def work(self):
        print("Working WorkerB")
        await asyncio.sleep(0.4)
        print("Done WorkerB")

async def main():
    sched = SomeScheduler()
    sched.register_worker(SomeWorkerA())
    sched.register_worker(SomeWorkerB())

    try:
        await sched.run()
    finally:
        sched.close()
        print("Bye")


asyncio.run(main())

По историческим причинам c, SomeScheduler.close() не является сопрограммой, и я не могу изменить API (без большого обсуждения в команде).

Теперь у меня есть новый тип работника:

class SomeWorkerC:
    async def close(self):
        print("Closing WorkerC")
        await asyncio.sleep(10)
        print("Done closing WorkerC")

    async def work(self):
        print("Working WorkerC")
        await asyncio.sleep(0.4)
        print("Done WorkerC")

Если я добавлю sched.register_worker(SomeWorkerC()) в функцию main(), то проблема в том, что SomeWorkerC.close() не выполняется, я получаю это сообщение об ошибке:

RuntimeWarning: coroutine 'SomeWorkerC.close' was never awaited
  co = None

Это имеет смысл, поэтому я хотел изменить SomeScheduler.close() следующим образом:

class SomeScheduler:
    def __init__(self):
        self.workers = []

    def close(self):
        for worker in self.workers:
            co = worker.close()
            if isinstance(co, types.CoroutineType):
                loop = asyncio.get_running_loop()
                loop.create_task(co)

, который работает только частично правильно, вывод stdout:

...
Done WorkerA
Done WorkerB
Done WorkerC
Closing WorkerA
Closing WorkerB
Bye
Closing WorkerC

но я также ожидал, что print("Done closing WorkerC") будет выполнено, однако это не тот случай, как вы можете видеть из вывода stdout. Документация loop.create_task() гласит:

Расписание выполнения сопрограмм. Верните объект Task.

, поэтому я предполагал, что вся задача будет выполнена, но кажется, что только до тех пор, пока не был выполнен первый await, остальная часть задачи не была выполнена. Я не могу сделать loop.run_forever() или loop.run_until_complete(co), потому что l oop все еще работает.

Я даже пытался заменить l oop другим l oop:

class SomeScheduler:
    def __init__(self):
        self.workers = []

    def close(self):
        for worker in self.workers:
            co = worker.close()

            if isinstance(co, types.CoroutineType):
                current_loop = asyncio.get_running_loop()
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
                asyncio.run(co)

, но это не так с RuntimeError: asyncio.run() cannot be called from a running event loop, и если вместо этого я asyncio.gather(co), то выполняется первая часть, но затем поднимается:

...
Closing WorkerA
Closing WorkerB
Bye
Closing WorkerC
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError

В этот момент я даже не уверен, может решить эту проблему, не меняя API и не делая SomeScheduler.close() подпрограммой. Как я могу решить это?

1 Ответ

0 голосов
/ 24 апреля 2020

Вы не можете сделать это, потому что асиновый c код в рабочей подпрограмме .close никогда не будет запущен в запущенном событии l oop, потому что в настоящее время он блокируется синхронным SomeScheduler * 1003. *; то есть асинхронный рабочий c не будет закрыт до тех пор, пока планировщик .close не прекратит блокировать событие l oop, что явно невозможно, поскольку закрытие рабочего находится ниже по потоку после закрытия планировщика.

Чтобы решить эту проблему, нужно вызвать asyn c рядом с планировщиком .close; например,

def shutdown():
    await scheduler.async_close()  # close async workers
    scheduler.close()  # close sync workers
...