Предотвращение тупиковых ситуаций много исследовалось, существуют некоторые практические решения, но в общем случае проблема неразрешима (я думаю, что она может быть сведена к проблеме остановки).
Чтобы проиллюстрировать практичность, рассмотрим это:
await asyncio.sleep(2 ** (1 / random.random()))
В зависимости от вашей удачи, он либо скоро вернется, либо «практически никогда».
Этот трюк можно использовать, чтобы показать, что основанную на обратном вызове программу невозможно предсказать:
f = asyncio.Future()
async foo():
await asyncio.sleep(2 ** (1 / random.random()))
f.set_result(None)
async bar():
await f
await asyncio.gather(foo(), bar())
Аналогично, это может быть применено к вашей "чистой" программе асинхронного ожидания / ожидания:
async def f():
await g_task
async def g():
await asyncio.wait(f_task,
asyncio.sleep(2 ** (1 / random.random())),
return_when=asyncio.FIRST_COMPLETED)
f_task = asyncio.create_task(f())
g_task = asyncio.create_task(g())
await f_task
В то же время, несовершенный, но практичный детектор тупиковых ситуаций может быть очень полезным, пожалуйста, учтитеотправка вашего кода в ядро asyncio devs и / или в автономную библиотеку.
В настоящее время рекомендуется запускать тесты с PYTHONASYNCIODEBUG=1
, который показывает не ожидаемые задачи (уничтоженные до прочтения результата / исключения).
Ваша библиотека могла бы быть лучше, например, она могла бы сообщать, когда какая-то задача занимала больше времени, чем X, или когда DAG задач в зависимости от данной задачи становился слишком большиме.