В моем проекте у меня есть декоратор для вызова сопрограмм в синхронных методах:
def async_test(f):
def wrapper(*args, **kwargs):
coro = asyncio.coroutine(f)
future = coro(*args, **kwargs)
loop = asyncio.get_event_loop()
loop.run_until_complete(future)
return wrapper
@async_test
async def test_foo(self):
# test stuff
Мне нужен этот декоратор для запуска тестов непосредственно из моей IDE.
Но теперь мне нужно вызвать их из асинхронного метода (после await
), и у меня RuntimeError
:
RuntimeError: Этот цикл событийуже работает
Как решить эту проблему с помощью цикла asyncio?