asyncio.gather (* tasks) не может ожидать только подмножество всех задач - PullRequest
2 голосов
/ 26 апреля 2020

Проблема, которую нужно решить (упрощенно)

Допустим, у меня есть 26 задач для параллельного выполнения. Чтобы минимизировать нагрузку на сервер, я решил запустить их по 10 одновременно: сначала 10 задач параллельно, затем следующие 10, наконец, оставшиеся 6.

Я написал простой скрипт для достижения этой цели:

import asyncio
from string import ascii_uppercase
from typing import List

TASK_NAMES = ascii_uppercase  # 26 fake tasks in total


class BatchWorker:
    """Run a list of tasks in batch."""

    BATCH_SIZE = 10

    def __init__(self, tasks: List[asyncio.Task]):
        self._tasks = list(tasks)

    @property
    def batch_of_tasks(self):
        """Yield all tasks by chunks of `BATCH_SIZE`"""
        start = 0
        while 'there are items remaining in the list':
            end = start + self.BATCH_SIZE
            chunk = self._tasks[start:end]
            if not chunk:
                break
            yield chunk
            start = end

    async def run(self):
        print(f'Running {self.BATCH_SIZE} tasks at a time')
        for batch in self.batch_of_tasks:
            print(f'\nWaiting for {len(batch)} tasks to complete...')
            await asyncio.gather(*batch)
            print('\nSleeping...\n---')
            await asyncio.sleep(1)


async def task(name: str):
    print(f"Task '{name}' is running...")
    await asyncio.sleep(3)  # Pretend to do something


async def main():
    tasks = [
      asyncio.create_task(task(name))
      for name in TASK_NAMES
    ]
    worker = BatchWorker(tasks)
    await worker.run()


if __name__ == '__main__':
    asyncio.run(main())

Что бы я ожидал

Я ожидал, что журналы будут выглядеть следующим образом:

Task A is running
[...]
Task J is running
Sleeping
---
Task K is running
[...]
Task T is running
Sleeping
---
[...]

... вы получите точку.

Что я на самом деле получаю

Однако на самой первой итерации рабочий ждет завершения всех 26 задач, несмотря на то, что я прошу собрать только партию из 10 из них . Проверьте журналы:

Running 10 tasks at a time

Waiting for 10 tasks to complete...
Task 'A' is running...
Task 'B' is running...
Task 'C' is running...
Task 'D' is running...
Task 'E' is running...
Task 'F' is running...
Task 'G' is running...
Task 'H' is running...
Task 'I' is running...
Task 'J' is running...
Task 'K' is running...
Task 'L' is running...
Task 'M' is running...
Task 'N' is running...
Task 'O' is running...
Task 'P' is running...
Task 'Q' is running...
Task 'R' is running...
Task 'S' is running...
Task 'T' is running...
Task 'U' is running...
Task 'V' is running...
Task 'W' is running...
Task 'X' is running...
Task 'Y' is running...
Task 'Z' is running...

Sleeping...
---

Waiting for 10 tasks to complete...

Sleeping...
---

Waiting for 6 tasks to complete...

Sleeping...
---

Как вы можете видеть, всего есть 3 пакета (как и ожидалось), но что-то делает только первый. Остальные 2 не имеют ничего общего.

Мои вопросы

  1. Учитывая, что в официальных документах указано, что .gather() будет работать только в ожидаемом режиме при условии в качестве параметра одновременно, почему мой сценарий выполняет все мои задачи вместо их кусков?

  2. Что еще я должен использовать, чтобы заставить его работать так, как мне хотелось бы?

Ответы [ 2 ]

2 голосов
/ 27 апреля 2020

gather на самом деле не «запускает» ожидание, оно просто спит, пока событие l oop делает свое дело, и просыпается, когда полученное ожидание завершено. Ваш код выполняет следующие действия:

  1. используйте asyncio.create_task(), чтобы породить кучу ожидаемых в фоновом режиме.
  2. используйте asyncio.gather(), чтобы ждать партиями, пока некоторые из них не будут завершены.

Тот факт, что gather() в # 2 получает подмножество задач, созданных в # 1, не помешает успешной работе остальных задач, созданных в # 1.

Чтобы решить эту проблему, вы должны отложить вызов create_task() до последней точки. Фактически, поскольку gather() вызывает ensure_future() для своих аргументов (а ensure_future, вызываемый с объектом сопрограммы, в конечном итоге вызывает create_task), вам вообще не нужно вызывать create_task(). Если вы удалите вызов create_task() из main и просто передадите объекты сопрограмм в BatchWorker (а затем и gather), задачи будут запланированы и ожидаются в пакетах, так же, как вы этого хотите:

async def main():
    tasks = [task(name) for name in TASK_NAMES]
    worker = BatchWorker(tasks)
    await worker.run()
0 голосов
/ 27 апреля 2020

Я изменил ваш код так, чтобы он вел себя так, как я думаю, вы хотите, чтобы он работал:

import asyncio
from string import ascii_uppercase
from typing import List

TASK_NAMES = ascii_uppercase  # 26 fake tasks in total


class BatchWorker:
    """Run a list of tasks in batch."""

    BATCH_SIZE = 10

    def __init__(self, tasks: List[asyncio.Task]):
        self._tasks = list(tasks)

    @property
    def batch_of_tasks(self):
        """Yield all tasks by chunks of `BATCH_SIZE`"""
        start = 0
        while 'there are items remaining in the list':
            end = start + self.BATCH_SIZE
            chunk = self._tasks[start:end]
            if not chunk:
                break
            yield chunk
            start = end

    async def run(self):
        print(f'Running {self.BATCH_SIZE} tasks at a time')
        for batch in self.batch_of_tasks:
            print(f'\nWaiting for {len(batch)} tasks to complete...')
            await asyncio.wait(batch)

async def task(name: str):
    print(f"Task '{name}' is running...")
    await asyncio.sleep(3)  # Pretend to do something


async def main():
    tasks = [
      task(name)
      for name in TASK_NAMES
    ]
    worker = BatchWorker(tasks)
    await worker.run()


if __name__ == '__main__':
    asyncio.run(main())

В измененном коде мы составили список задач (не запланировано), а затем бросили их в случае l oop с wait, который мы затем ждем, когда они завершат sh, прежде чем разрешить для l oop продолжить. Таким образом, мы разбиваем задачи на группы не более десяти, как вы пытаетесь сделать.

Примечание: Как видно из приведенных ниже комментариев, в этом случае очень мало различий между сбором и ожиданием, поскольку мое первоначальное объяснение неверно.

...