Обработка асинхронных тупиков - PullRequest
2 голосов
/ 25 марта 2019

Этот пример кода зависает на неопределенное время:

import asyncio


async def main():
    async def f():
        await g_task

    async def g():
        await f_task

    f_task = asyncio.create_task(f())
    g_task = asyncio.create_task(g())
    await f_task


asyncio.run(main())

Я ищу способ автоматического обнаружения и обработки взаимоблокировок, как это делает GoLang.

Пока что я придумал вариантasyncio.wait_for():

[РЕДАКТИРОВАТЬ] капитальный ремонт

https://gist.github.com/gimperiale/549cbad04c24d870145d3f38fbb8e6f0

1 строка изменения в исходном коде:

await wait_check_deadlock(f_task)

Работает, но с двумя основными проблемами:

  1. он опирается на asyncio.Task._fut_waiter, что является подробностью реализации CPython
  2. Заблокированные задачи останутся в оперативной памяти навсегда.aw.cancel() похоже ничего не делает.Если я получаю RecursionError, моя вспомогательная функция поднимается, asyncio.run () вызывает другой RecursionError, когда пытается отменить все задачи.

Существуют ли более надежные решения проблемы?

1 Ответ

1 голос
/ 26 марта 2019

Предотвращение тупиковых ситуаций много исследовалось, существуют некоторые практические решения, но в общем случае проблема неразрешима (я думаю, что она может быть сведена к проблеме остановки).

Чтобы проиллюстрировать практичность, рассмотрим это:

await asyncio.sleep(2 ** (1 / random.random()))

В зависимости от вашей удачи, он либо скоро вернется, либо «практически никогда».

Этот трюк можно использовать, чтобы показать, что основанную на обратном вызове программу невозможно предсказать:

f = asyncio.Future()

async foo():
    await asyncio.sleep(2 ** (1 / random.random()))
    f.set_result(None)

async bar():
    await f

await asyncio.gather(foo(), bar())

Аналогично, это может быть применено к вашей "чистой" программе асинхронного ожидания / ожидания:

async def f():
    await g_task

async def g():
    await asyncio.wait(f_task,
                       asyncio.sleep(2 ** (1 / random.random())),
                       return_when=asyncio.FIRST_COMPLETED)

f_task = asyncio.create_task(f())
g_task = asyncio.create_task(g())
await f_task

В то же время, несовершенный, но практичный детектор тупиковых ситуаций может быть очень полезным, пожалуйста, учтитеотправка вашего кода в ядро ​​asyncio devs и / или в автономную библиотеку.

В настоящее время рекомендуется запускать тесты с PYTHONASYNCIODEBUG=1, который показывает не ожидаемые задачи (уничтоженные до прочтения результата / исключения).

Ваша библиотека могла бы быть лучше, например, она могла бы сообщать, когда какая-то задача занимала больше времени, чем X, или когда DAG задач в зависимости от данной задачи становился слишком большиме.

...