Последовательная версия asyncio.gather - PullRequest
0 голосов
/ 04 апреля 2020

Я попытался создать метод, похожий на asyncio.gather, но который будет выполнять список задач последовательно, а не асинхронно:

async def in_sequence(*tasks):
    """Executes tasks in sequence"""
    for task in tasks:
        await task

Далее этот метод должен был использоваться следующим образом:

async def some_work(work_name):
    """Do some work"""
    print(f"Start {work_name}")
    await asyncio.sleep(1)
    if raise_exception:
        raise RuntimeError(f"{work_name} raise an exception")
    print(f"Finish {work_name}")

async def main():
    try:
        await asyncio.gather(
            some_work("work1"),         # work1, work2, in_sequence and work5 executed in concurrently
            some_work("work2"),
            in_sequence(
                some_work("work3"),     # work3 and work4 executed in sequence
                some_work("work4")
            ),
            some_work("work5"),


    except RuntimeError as error:
        print(error)                    # raise an exception at any point to terminate

И все работало нормально, пока я не попытался вызвать исключение в some_work:

async def main():
    try:
        await asyncio.gather(
            some_work("work1"),
            some_work("work2"),
            in_sequence(
                some_work("work3", raise_exception=True),       # raise an exception here
                some_work("work4")
            ),
            some_work("work5"),


    except RuntimeError as error:
        print(error)

Сразу после этого я получил следующее сообщение об ошибке:

RuntimeWarning: coroutine 'some_work' was never awaited

Я прочитал документацию и продолжил экспериментировать:

async def in_sequence(*tasks):
    """Executes tasks in sequence"""
    _tasks = []
    for task in tasks:
        _tasks.append(asyncio.create_task(task))

    for _task in _tasks:
        await _task

И эта версия сработала как положено!

В связи с этим у меня возникли следующие вопросы:

  1. Почему работает ли вторая версия, а первая нет?
  2. Есть ли в Asyncio инструменты для последовательного выполнения списка задач?
  3. Я выбрал правильный метод реализации или есть лучшие варианты?

Ответы [ 3 ]

1 голос
/ 04 апреля 2020
  1. Первая версия не работает, поскольку in_sequence не перехватывает исключение, которое может быть вызвано await task. Второй работает, потому что create_task создает объект Task , похожий на будущее, который запускает сопрограмму. Объект не возвращает / не передает результат свернутой сопрограммы. Когда вы await объект, он приостанавливается до тех пор, пока не будет результат или исключение set или пока он не будет отменен .

  2. Кажется, что нет.

  3. Вторая версия будет одновременно выполнять переданные сопрограммы, поэтому это неверная реализация. Если вы действительно хотите использовать некоторую функцию in_sequence, вы можете:
    • Каким-то образом отложить создание сопрограмм.
    • Групповое последовательное выполнение в функции async

Например:

async def in_sequence(*fn_and_args):
    for fn, args, kwargs in fn_and_args:
        await fn(*args, **kwargs)  # create a coro and await it in place

in_sequence(
    (some_work, ("work3",), {'raise_exception': True}),
    (some_work, ("work4",), {}),
)
async def in_sequence():
    await some_work("work3", raise_exception=True)
    await some_work("work4")
1 голос
/ 04 апреля 2020

И эта версия работала как ожидалось!

Проблема со второй версией заключается в том, что она на самом деле не запускает сопрограммы последовательно, она запускает их параллельно. Это связано с тем, что asyncio.create_task() планирует запуск сопрограммы параллельно с текущими сопрограммами. Поэтому, когда вы ожидаете задачи в al oop, вы фактически разрешаете выполнение всех задач в ожидании первой. Несмотря на видимость, весь l oop будет работать только столько времени, сколько будет самой длинной задачей. (Подробнее см. здесь .)

Предупреждение, отображаемое вашей первой версией, предназначено для предотвращения случайного создания сопрограммы, которую вы никогда не ожидаете, например, написав просто asyncio.sleep(1) вместо await asyncio.sleep(1). Что касается asyncio, main создает экземпляры объектов сопрограмм и передает их в in_sequence, который «забывает» ожидать некоторых из них.

Один из способов подавить предупреждающее сообщение - разрешить сопрограммам раскрутите, но отмените это немедленно. Например:

async def in_sequence(*coros):
    remaining = iter(coros)
    for coro in remaining:
        try:
            await coro
        except Exception:
            for c in remaining:
                asyncio.create_task(c).cancel()
            raise

Обратите внимание, что имя переменной, начинающееся с подчеркивания, помечает неиспользуемую переменную, поэтому вы не должны называть переменные, так что если вы действительно их используете.

1 голос
/ 04 апреля 2020

Вы сказали, что версия in_sequence работает (с asyncio.create_task), но я думаю, что это не так. Из документов

Заверните сопрограмму coro в задание и запланируйте его выполнение. Верните объект Task.

Кажется, что он запускает сопрограммы параллельно, но они вам нужны последовательно.

Итак, поэкспериментировали и нашли два способа, как это исправить

Используйте исходную функцию in_sequence и добавьте этот код, который скрывает эту ошибку:

import warnings
warnings.filterwarnings(
    'ignore',
    message=r'^coroutine .* was never awaited$',
    category=RuntimeWarning
)

Исправьте функцию in_sequence, например:

async def in_sequence(*tasks):
    for index, task in enumerate(tasks):
        try:
            await task
        except Exception as e:
            for task in tasks[index + 1:]:
                task.close()
            raise e

Ответы на другие вопросы:

  1. Это предупреждение вызывается кодом C ++, когда у вас нет ссылок на сопрограммы. простой код может показать вам эту идею (в терминале):

async def test():
    return 1

f = test()
f = None # after that you will get that error
не знаю см. Выше
...