Завершение asyncio.gather в тайм-аут - PullRequest
0 голосов
/ 29 января 2019

Я видел asyncio.gather vs asyncio.wait , но я не уверен, решает ли это этот конкретный вопрос.Я хочу обернуть сопрограмму asyncio.gather() в asyncio.wait_for() аргументом timeout.Мне также нужно выполнить следующие условия:

  • return_exceptions=True (из asyncio.gather()) - вместо того, чтобы распространять исключения для задачи, ожидающей на gather(), я хочу включить экземпляры исключений в результаты
  • Порядок: сохраните свойство asyncio.gather(), что порядок результатов совпадает с порядком ввода.(Или, в противном случае, отобразите выход обратно на вход.).asyncio.wait_for() не соответствует этим критериям, и я не уверен в идеальном способе его достижения.

Тайм-аут для всего asyncio.gather() в списке ожидаемых -если они попадают в тайм-аут или возвращают исключение, то в любом из этих случаев следует просто поместить экземпляр исключения в список результатов.

Рассмотрим эту настройку:

>>> import asyncio
>>> import random
>>> from time import perf_counter
>>> from typing import Iterable
>>> from pprint import pprint
>>> 
>>> async def coro(i, threshold=0.4):
...     await asyncio.sleep(i)
...     if i > threshold:
...         # For illustration's sake - some coroutines may raise,
...         # and we want to accomodate that and just test for exception
...         # instances in the results of asyncio.gather(return_exceptions=True)
...         raise Exception("i too high")
...     return i
... 
>>> async def main(n, it: Iterable):
...     res = await asyncio.gather(
...         *(coro(i) for i in it),
...         return_exceptions=True
...     )
...     return res
... 
>>> 
>>> random.seed(444)
>>> n = 10
>>> it = [random.random() for _ in range(n)]
>>> start = perf_counter()
>>> res = asyncio.run(main(n, it=it))
>>> elapsed = perf_counter() - start
>>> print(f"Done main({n}) in {elapsed:0.2f} seconds")  # Expectation: ~1 seconds
Done main(10) in 0.86 seconds
>>> pprint(dict(zip(it, res)))
{0.01323751590501987: 0.01323751590501987,
 0.07422124156714727: 0.07422124156714727,
 0.3088946587429545: 0.3088946587429545,
 0.3113884366691503: 0.3113884366691503,
 0.4419557492849159: Exception('i too high'),
 0.4844375347808497: Exception('i too high'),
 0.5796792804615848: Exception('i too high'),
 0.6338658027451068: Exception('i too high'),
 0.7426396870165088: Exception('i too high'),
 0.8614799253779063: Exception('i too high')}

Программа выше,с n = 10, ожидаемое время выполнения составляет 0,5 секунды, плюс немного накладных расходов при асинхронном запуске.(random.random() будет равномерно распределено в [0, 1).)

Скажем, я хочу наложить это как время ожидания на всю операцию (т.е. на сопрограмму main()):

timeout = 0.5

Теперь я могу использовать asyncio.wait(), но проблема в том, что результатом являются set объекты, и поэтому определенно не может гарантировать свойство отсортированного возвращаемого значения asyncio.gather():

>>> async def main(n, it, timeout) -> tuple:
...     tasks = [asyncio.create_task(coro(i)) for i in it]
...     done, pending = await asyncio.wait(tasks, timeout=timeout)
...     return done, pending
... 
>>> timeout = 0.5
>>> random.seed(444)
>>> it = [random.random() for _ in range(n)]
>>> start = perf_counter()
>>> done, pending = asyncio.run(main(n, it=it, timeout=timeout))
>>> for i in pending:
...     i.cancel()
>>> elapsed = perf_counter() - start
>>> print(f"Done main({n}) in {elapsed:0.2f} seconds")
Done main(10) in 0.50 seconds
>>> done
{<Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>, <Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3088946587429545>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3113884366691503>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.01323751590501987>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.07422124156714727>}
>>> pprint(done)
{<Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3113884366691503>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.07422124156714727>,
 <Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.01323751590501987>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3088946587429545>}
>>> pprint(pending)
{<Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>}

Как указано выше, проблема в том, что я, по-видимому, не могу сопоставить task экземпляры с входными данными в iterable.Их идентификаторы задач фактически теряются внутри области функций с tasks = [asyncio.create_task(coro(i)) for i in it].Есть ли Pythonic способ / использование asyncio API для имитации поведения asyncio.gather() здесь?

1 Ответ

0 голосов
/ 29 января 2019

Взглянув на лежащую в основе сопрограмму _wait(), эта сопрограмма получает список задач и изменяет состояние этих задач на месте.Это означает, что в пределах main() значение tasks из tasks = [asyncio.create_task(coro(i)) for i in it] будет изменено при вызове на await asyncio.wait(tasks, timeout=timeout).Вместо того, чтобы возвращать кортеж (done, pending), один из обходных путей - просто вернуть tasks, что сохраняет порядок при вводе it.wait() / _wait() просто разделяет задачи на готовые / ожидающие подмножества, и в этом случае мы можем отбросить эти подмножества и использовать целые списки tasks, элементы которых были изменены.

Существует три возможныхВ этом случае состояния задач:

  • Задача, вернувшая допустимый результат (coro()), не вызвала исключение, и завершилась под timeout.Его .cancelled() будет False, и у него есть действительный .result(), который не является экземпляром исключения
  • Задание получило тайм-аут перед возможностью вернуть либо результат, либо вызвать исключение.Он покажет .cancelled(), а его .exception() вызовет CancelledError
  • Задачу, которой было разрешено время для завершения и вызвала исключение из coro();.cancelled() будет отображаться как False, а exception() повысит

(все это изложено в asyncio / futures.py .)


Иллюстрация:

>>> # imports/other code snippets - see question
>>> async def main(n, it, timeout) -> tuple:
...     tasks = [asyncio.create_task(coro(i)) for i in it]
...     await asyncio.wait(tasks, timeout=timeout)
...     return tasks  # *not* (done, pending)

>>> timeout = 0.5
>>> random.seed(444)
>>> n = 10
>>> it = [random.random() for _ in range(n)]
>>> start = perf_counter()
>>> tasks = asyncio.run(main(n, it=it, timeout=timeout))
>>> elapsed = perf_counter() - start
>>> print(f"Done main({n}) in {elapsed:0.2f} seconds")
Done main(10) in 0.50 seconds

>>> pprint(tasks)
[<Task finished coro=<coro() done, defined at <stdin>:1> result=0.3088946587429545>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.01323751590501987>,
 <Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3113884366691503>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.07422124156714727>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>]

Теперь примените логику сверху, которая позволяет res сохранять порядок, соответствующий входам:

>>> res = []
>>> for t in tasks:
...     try:
...         r = t.result()
...     except Exception as e:
...         res.append(e)
...     else:
...         res.append(r)
>>> pprint(res)
[0.3088946587429545,
 0.01323751590501987,
 Exception('i too high'),
 CancelledError(),
 CancelledError(),
 CancelledError(),
 Exception('i too high'),
 0.3113884366691503,
 0.07422124156714727,
 CancelledError()]
>>> dict(zip(it, res))
{0.3088946587429545: 0.3088946587429545,
 0.01323751590501987: 0.01323751590501987,
 0.4844375347808497: Exception('i too high'),
 0.8614799253779063: concurrent.futures._base.CancelledError(),
 0.7426396870165088: concurrent.futures._base.CancelledError(),
 0.6338658027451068: concurrent.futures._base.CancelledError(),
 0.4419557492849159: Exception('i too high'),
 0.3113884366691503: 0.3113884366691503,
 0.07422124156714727: 0.07422124156714727,
 0.5796792804615848: concurrent.futures._base.CancelledError()}
...