Я поддерживаю проект, в котором у меня есть случай, когда мне нужно запланировать сопрограмму внутри синхронной функции, которая уже выполняется внутри события l oop.
Моя проблема сводится к следующему:
import asyncio
class SomeScheduler:
def __init__(self):
self.workers = []
# This is not a coroutine yet
def close(self):
for worker in self.workers:
worker.close()
def register_worker(self, worker):
self.workers.append(worker)
async def run(self):
for _ in range(3):
print("Doing stuff")
coros = map(lambda x: x.work(), self.workers)
await asyncio.gather(*coros)
await asyncio.sleep(1)
class SomeWorkerA:
def close(self):
print("Closing WorkerA")
async def work(self):
print("Working WorkerA")
await asyncio.sleep(0.2)
print("Done WorkerA")
class SomeWorkerB:
def close(self):
print("Closing WorkerB")
async def work(self):
print("Working WorkerB")
await asyncio.sleep(0.4)
print("Done WorkerB")
async def main():
sched = SomeScheduler()
sched.register_worker(SomeWorkerA())
sched.register_worker(SomeWorkerB())
try:
await sched.run()
finally:
sched.close()
print("Bye")
asyncio.run(main())
По историческим причинам c, SomeScheduler.close()
не является сопрограммой, и я не могу изменить API (без большого обсуждения в команде).
Теперь у меня есть новый тип работника:
class SomeWorkerC:
async def close(self):
print("Closing WorkerC")
await asyncio.sleep(10)
print("Done closing WorkerC")
async def work(self):
print("Working WorkerC")
await asyncio.sleep(0.4)
print("Done WorkerC")
Если я добавлю sched.register_worker(SomeWorkerC())
в функцию main()
, то проблема в том, что SomeWorkerC.close()
не выполняется, я получаю это сообщение об ошибке:
RuntimeWarning: coroutine 'SomeWorkerC.close' was never awaited
co = None
Это имеет смысл, поэтому я хотел изменить SomeScheduler.close()
следующим образом:
class SomeScheduler:
def __init__(self):
self.workers = []
def close(self):
for worker in self.workers:
co = worker.close()
if isinstance(co, types.CoroutineType):
loop = asyncio.get_running_loop()
loop.create_task(co)
, который работает только частично правильно, вывод stdout
:
...
Done WorkerA
Done WorkerB
Done WorkerC
Closing WorkerA
Closing WorkerB
Bye
Closing WorkerC
но я также ожидал, что print("Done closing WorkerC")
будет выполнено, однако это не тот случай, как вы можете видеть из вывода stdout
. Документация loop.create_task()
гласит:
Расписание выполнения сопрограмм. Верните объект Task.
, поэтому я предполагал, что вся задача будет выполнена, но кажется, что только до тех пор, пока не был выполнен первый await
, остальная часть задачи не была выполнена. Я не могу сделать loop.run_forever()
или loop.run_until_complete(co)
, потому что l oop все еще работает.
Я даже пытался заменить l oop другим l oop:
class SomeScheduler:
def __init__(self):
self.workers = []
def close(self):
for worker in self.workers:
co = worker.close()
if isinstance(co, types.CoroutineType):
current_loop = asyncio.get_running_loop()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.run(co)
, но это не так с RuntimeError: asyncio.run() cannot be called from a running
event loop
, и если вместо этого я asyncio.gather(co)
, то выполняется первая часть, но затем поднимается:
...
Closing WorkerA
Closing WorkerB
Bye
Closing WorkerC
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
В этот момент я даже не уверен, может решить эту проблему, не меняя API и не делая SomeScheduler.close()
подпрограммой. Как я могу решить это?