Я видел asyncio.gather vs asyncio.wait , но я не уверен, решает ли это этот конкретный вопрос.Я хочу обернуть сопрограмму asyncio.gather()
в asyncio.wait_for()
аргументом timeout
.Мне также нужно выполнить следующие условия:
return_exceptions=True
(из asyncio.gather()
) - вместо того, чтобы распространять исключения для задачи, ожидающей на gather()
, я хочу включить экземпляры исключений в результаты - Порядок: сохраните свойство
asyncio.gather()
, что порядок результатов совпадает с порядком ввода.(Или, в противном случае, отобразите выход обратно на вход.).asyncio.wait_for()
не соответствует этим критериям, и я не уверен в идеальном способе его достижения.
Тайм-аут для всего asyncio.gather()
в списке ожидаемых -если они попадают в тайм-аут или возвращают исключение, то в любом из этих случаев следует просто поместить экземпляр исключения в список результатов.
Рассмотрим эту настройку:
>>> import asyncio
>>> import random
>>> from time import perf_counter
>>> from typing import Iterable
>>> from pprint import pprint
>>>
>>> async def coro(i, threshold=0.4):
... await asyncio.sleep(i)
... if i > threshold:
... # For illustration's sake - some coroutines may raise,
... # and we want to accomodate that and just test for exception
... # instances in the results of asyncio.gather(return_exceptions=True)
... raise Exception("i too high")
... return i
...
>>> async def main(n, it: Iterable):
... res = await asyncio.gather(
... *(coro(i) for i in it),
... return_exceptions=True
... )
... return res
...
>>>
>>> random.seed(444)
>>> n = 10
>>> it = [random.random() for _ in range(n)]
>>> start = perf_counter()
>>> res = asyncio.run(main(n, it=it))
>>> elapsed = perf_counter() - start
>>> print(f"Done main({n}) in {elapsed:0.2f} seconds") # Expectation: ~1 seconds
Done main(10) in 0.86 seconds
>>> pprint(dict(zip(it, res)))
{0.01323751590501987: 0.01323751590501987,
0.07422124156714727: 0.07422124156714727,
0.3088946587429545: 0.3088946587429545,
0.3113884366691503: 0.3113884366691503,
0.4419557492849159: Exception('i too high'),
0.4844375347808497: Exception('i too high'),
0.5796792804615848: Exception('i too high'),
0.6338658027451068: Exception('i too high'),
0.7426396870165088: Exception('i too high'),
0.8614799253779063: Exception('i too high')}
Программа выше,с n = 10
, ожидаемое время выполнения составляет 0,5 секунды, плюс немного накладных расходов при асинхронном запуске.(random.random()
будет равномерно распределено в [0, 1).)
Скажем, я хочу наложить это как время ожидания на всю операцию (т.е. на сопрограмму main()
):
timeout = 0.5
Теперь я могу использовать asyncio.wait()
, но проблема в том, что результатом являются set
объекты, и поэтому определенно не может гарантировать свойство отсортированного возвращаемого значения asyncio.gather()
:
>>> async def main(n, it, timeout) -> tuple:
... tasks = [asyncio.create_task(coro(i)) for i in it]
... done, pending = await asyncio.wait(tasks, timeout=timeout)
... return done, pending
...
>>> timeout = 0.5
>>> random.seed(444)
>>> it = [random.random() for _ in range(n)]
>>> start = perf_counter()
>>> done, pending = asyncio.run(main(n, it=it, timeout=timeout))
>>> for i in pending:
... i.cancel()
>>> elapsed = perf_counter() - start
>>> print(f"Done main({n}) in {elapsed:0.2f} seconds")
Done main(10) in 0.50 seconds
>>> done
{<Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>, <Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3088946587429545>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3113884366691503>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.01323751590501987>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.07422124156714727>}
>>> pprint(done)
{<Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
<Task finished coro=<coro() done, defined at <stdin>:1> result=0.3113884366691503>,
<Task finished coro=<coro() done, defined at <stdin>:1> result=0.07422124156714727>,
<Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
<Task finished coro=<coro() done, defined at <stdin>:1> result=0.01323751590501987>,
<Task finished coro=<coro() done, defined at <stdin>:1> result=0.3088946587429545>}
>>> pprint(pending)
{<Task cancelled coro=<coro() done, defined at <stdin>:1>>,
<Task cancelled coro=<coro() done, defined at <stdin>:1>>,
<Task cancelled coro=<coro() done, defined at <stdin>:1>>,
<Task cancelled coro=<coro() done, defined at <stdin>:1>>}
Как указано выше, проблема в том, что я, по-видимому, не могу сопоставить task
экземпляры с входными данными в iterable
.Их идентификаторы задач фактически теряются внутри области функций с tasks = [asyncio.create_task(coro(i)) for i in it]
.Есть ли Pythonic способ / использование asyncio API для имитации поведения asyncio.gather()
здесь?