Я пытаюсь создать периодическое задание для цикла событий asyncio, как показано ниже, однако я получаю исключение «RuntimeError: не могу повторно использовать уже ожидаемую подпрограмму». По-видимому, asyncio не позволяет ожидать той же ожидаемой функции, как обсуждалось в этой ветке ошибок . Вот как я пытался это реализовать:
import asyncio
class AsyncEventLoop:
def __init__(self):
self._loop = asyncio.get_event_loop()
def add_periodic_task(self, async_func, interval):
async def wrapper(_async_func, _interval):
while True:
await _async_func # This is where it goes wrong
await asyncio.sleep(_interval)
self._loop.create_task(wrapper(async_func, interval))
return
def start(self):
self._loop.run_forever()
return
Из-за моего цикла while та же самая ожидаемая функция (_async_func) будет выполняться с интервалом ожидания между ними. Я получил вдохновение для выполнения периодических задач от Как я могу периодически выполнять функцию с помощью asyncio?
.
Из упомянутой выше ветки ошибок я делаю вывод, что идея RuntimeError заключалась в том, чтобы разработчики случайно не ожидали одну и ту же сопрограмму дважды или более, поскольку сопрограмма будет помечена как выполненная и выдаст None вместо результата. Есть ли способ, которым я могу ожидать одну и ту же функцию более одного раза?