Обработка таймаутов с помощью asyncio - PullRequest
2 голосов
/ 13 марта 2020

Отказ от ответственности: я впервые экспериментирую с модулем asyncio.

Я использую asyncio.wait следующим образом, чтобы попытаться поддержать функцию ожидания, ожидающую все результаты из набора асин c задач. Это часть большой библиотеки, поэтому я опускаю не относящийся к делу код.

Обратите внимание, что библиотека уже поддерживает отправку задач и использование таймаутов с ThreadPoolExecutors и ProcessPoolExecutors, поэтому меня не очень интересуют предложения по их использованию или вопросы о том, почему я делаю это с asyncio. К коду ...

import asyncio
from contextlib import suppress

... 

class AsyncIOSubmit(Node):
    def get_results(self, futures, timeout=None):
        loop = asyncio.get_event_loop()
        finished, unfinished = loop.run_until_complete(
            asyncio.wait(futures, timeout=timeout)
        )
        if timeout and unfinished:
            # Code options in question would go here...see below.
            raise asyncio.TimeoutError

Сначала я не беспокоился об отмене отложенных задач по тайм-ауту, но затем я получил предупреждение Task was destroyed but it is pending! при выходе из программы или loop.close. После небольшого исследования я нашел несколько способов отмены задач и ожидания их фактического отмены:

Вариант 1:

[task.cancel() for task in unfinished]
for task in unfinished:
    with suppress(asyncio.CancelledError):
        loop.run_until_complete(task)

Вариант 2:

[task.cancel() for task in unfinished]
loop.run_until_complete(asyncio.wait(unfinished))

Вариант 3:

# Not really an option for me, since I'm not in an `async` method
# and don't want to make get_results an async method.
[task.cancel() for task in unfinished]
for task in unfinished:
    await task

Вариант 4:

Какой-то тип l oop как в этом ответе. Похоже, мои другие варианты лучше, но в том числе и для полноты.


Опции 1 и 2, похоже, до сих пор работают нормально. Любой из этих вариантов может быть «правильным», но в связи с развитием asyncio за прошедшие годы примеры и предложения, касающиеся net, либо устарели, либо сильно различаются. Итак, мои вопросы ...

Вопрос 1

Есть ли практические различия между вариантами 1 и 2? Я знаю, что run_until_complete будет работать до тех пор, пока не завершится будущее, поэтому, поскольку Вариант 1 зацикливается в определенном порядке c, я полагаю, что он может вести себя иначе, если более ранние задачи на самом деле выполняются дольше. Я попытался взглянуть на исходный код asyncio, чтобы понять, действительно ли asyncio.wait эффективно делает то же самое со своими задачами / фьючерсами под капотом, но это было неочевидно.

Вопрос 2

Я полагаю, что если одна из задач находится в середине длительной операции блокировки, она не может быть немедленно отменена? Возможно, это зависит только от того, будет ли используемая базовая операция или библиотека сразу вызывать CancelledError или нет? Может быть, этого никогда не случится с библиотеками, разработанными для asyncio?

Поскольку я пытаюсь реализовать здесь функцию тайм-аута, я несколько чувствителен к этому. Если возможно, что для отмены этих вещей может потребоваться много времени, я бы подумал позвонить по номеру cancel и не ждать, пока это произойдет, или установить очень короткий тайм-аут, чтобы дождаться отмены до конца sh.

Вопрос 3

Возможно ли loop.run_until_complete (или действительно, базовый вызов async.wait) возвращает значения в unfinished по причине, отличной от тайм-аута? Если это так, то мне, очевидно, придется немного откорректировать логи c, но из docs кажется, что это невозможно.

1 Ответ

1 голос
/ 13 марта 2020

Есть ли практические различия между вариантами 1 и 2?

Нет. Вариант 2 выглядит лучше и может быть несколько более эффективным, но их эффект net такой же.

Я знаю, run_until_complete будет работать до тех пор, пока не завершится будущее, поэтому, так как Вариант 1 зацикливается на конкретный c порядок Я полагаю, что он мог бы вести себя иначе, если бы более ранние задачи на самом деле выполнялись дольше.

Поначалу кажется, что так, но на самом деле это не так, потому что loop.run_until_complete запускает все задачи, переданные в l oop, а не только те, которые переданы в качестве аргумента. Он просто останавливается , как только завершается заданное ожидаемое - это то, к чему относится «выполнение до завершения». L oop вызов run_until_complete поверх уже запланированных задач похож на следующий асин c код:

ts = [asyncio.create_task(asyncio.sleep(i)) for i in range(1, 11)]
# takes 10s, not 55s
for t in ts:
    await t

, который в свою очередь семантически эквивалентен следующему многопоточному коду:

ts = []
for i in range(1, 11):
    t = threading.Thread(target=time.sleep, args=(i,))
    t.start()
    ts.append(t)
# takes 10s, not 55s
for t in ts:
    t.join()

Другими словами, await t и run_until_complete(t) блокируются до завершения t, но разрешают все остальное - например, задачи, ранее запланированные с использованием asyncio.create_task(), также запускаться в течение этого времени. Таким образом, общее время выполнения будет равно времени выполнения самой длинной задачи, а не их сумме. Например, если первая задача занимает много времени, все остальные будут выполнены за это время, и их ожидающие вообще не будут спать.

Все это относится только к ожидающим задачам, которые были ранее Запланированное. Если вы попытаетесь применить это к сопрограммам, это не сработает:

# runs for 55s, as expected
for i in range(1, 11):
    await asyncio.sleep(i)

# also 55s
for i in range(1, 11):
   t = threading.Thread(target=time.sleep, args=(i,))
   t.start()
   t.join()

Это часто является камнем преткновения для начинающих пользователей asyncio, которые пишут код, эквивалентный последнему примеру asyncio, и ожидают, что он будет работать параллельно .

Я попытался просмотреть исходный код asyncio, чтобы понять, действительно ли asyncio.wait делает то же самое с его задачами / фьючерсами под капотом, но это не было очевидно.

asyncio.wait - это просто удобный API, который выполняет две функции:

  • преобразует входные аргументы во что-то, что реализует Future. Для сопрограмм это означает, что он отправляет их на событие l oop, как будто с create_task, что позволяет им работать независимо. Если вы начнете задавать задачи, как и вы, этот шаг будет пропущен.
  • использует add_done_callback, чтобы получить уведомление о завершении фьючерса, после чего он возобновляет свою вызывающую функцию.

Так что да, он делает то же самое, но с другой реализацией, потому что поддерживает много других функций.

Я предполагаю, что одна из задач находится в середине долгосрочной работы Операция блокировки не может быть отменена немедленно?

В asyncio не должно быть операций «блокировки», только те, которые приостанавливаются, и они должны быть немедленно отменены. Исключением является блокировка кода, прикрепленного к asyncio с run_in_executor, где основная операция вообще не будет отменена, но сопрограмма asyncio немедленно получит исключение.

Возможно, это зависит только от если используемая базовая операция или библиотека немедленно вызовет CancelledError или нет?

Библиотека не поднимает CancelledError, она получает это в точке ожидания, где произошло приостановление до отмены. Для библиотеки эффект отмены await ... прерывает ее ожидание и немедленно повышает CancelledError. Если исключение не обнаружено, исключение будет распространяться через функцию, и await будет вызывать всю процедуру до сопрограммы верхнего уровня, повышение которой CancelledError помечает всю задачу как отмененную. Хороший асинхронный код сделает это, возможно, используя finally, чтобы высвободить ресурсы уровня ОС, которые они содержат. Когда CancelledError пойман, код может решить не поднимать его повторно, и в этом случае отмена фактически игнорируется.

Возможно ли l oop .run_until_complete (или действительно, базовый Вызов async.wait) возвращает значения в незаконченном состоянии по причине, отличной от тайм-аута?

Если вы используете return_when=asyncio.ALL_COMPLETE (по умолчанию), это не должно быть возможным. Это вполне возможно при return_when=FIRST_COMPLETED, тогда это очевидно возможно независимо от времени ожидания.

...