Проблема, которую нужно решить (упрощенно)
Допустим, у меня есть 26 задач для параллельного выполнения. Чтобы минимизировать нагрузку на сервер, я решил запустить их по 10 одновременно: сначала 10 задач параллельно, затем следующие 10, наконец, оставшиеся 6.
Я написал простой скрипт для достижения этой цели:
import asyncio
from string import ascii_uppercase
from typing import List
TASK_NAMES = ascii_uppercase # 26 fake tasks in total
class BatchWorker:
"""Run a list of tasks in batch."""
BATCH_SIZE = 10
def __init__(self, tasks: List[asyncio.Task]):
self._tasks = list(tasks)
@property
def batch_of_tasks(self):
"""Yield all tasks by chunks of `BATCH_SIZE`"""
start = 0
while 'there are items remaining in the list':
end = start + self.BATCH_SIZE
chunk = self._tasks[start:end]
if not chunk:
break
yield chunk
start = end
async def run(self):
print(f'Running {self.BATCH_SIZE} tasks at a time')
for batch in self.batch_of_tasks:
print(f'\nWaiting for {len(batch)} tasks to complete...')
await asyncio.gather(*batch)
print('\nSleeping...\n---')
await asyncio.sleep(1)
async def task(name: str):
print(f"Task '{name}' is running...")
await asyncio.sleep(3) # Pretend to do something
async def main():
tasks = [
asyncio.create_task(task(name))
for name in TASK_NAMES
]
worker = BatchWorker(tasks)
await worker.run()
if __name__ == '__main__':
asyncio.run(main())
Что бы я ожидал
Я ожидал, что журналы будут выглядеть следующим образом:
Task A is running
[...]
Task J is running
Sleeping
---
Task K is running
[...]
Task T is running
Sleeping
---
[...]
... вы получите точку.
Что я на самом деле получаю
Однако на самой первой итерации рабочий ждет завершения всех 26 задач, несмотря на то, что я прошу собрать только партию из 10 из них . Проверьте журналы:
Running 10 tasks at a time
Waiting for 10 tasks to complete...
Task 'A' is running...
Task 'B' is running...
Task 'C' is running...
Task 'D' is running...
Task 'E' is running...
Task 'F' is running...
Task 'G' is running...
Task 'H' is running...
Task 'I' is running...
Task 'J' is running...
Task 'K' is running...
Task 'L' is running...
Task 'M' is running...
Task 'N' is running...
Task 'O' is running...
Task 'P' is running...
Task 'Q' is running...
Task 'R' is running...
Task 'S' is running...
Task 'T' is running...
Task 'U' is running...
Task 'V' is running...
Task 'W' is running...
Task 'X' is running...
Task 'Y' is running...
Task 'Z' is running...
Sleeping...
---
Waiting for 10 tasks to complete...
Sleeping...
---
Waiting for 6 tasks to complete...
Sleeping...
---
Как вы можете видеть, всего есть 3 пакета (как и ожидалось), но что-то делает только первый. Остальные 2 не имеют ничего общего.
Мои вопросы
Учитывая, что в официальных документах указано, что .gather()
будет работать только в ожидаемом режиме при условии в качестве параметра одновременно, почему мой сценарий выполняет все мои задачи вместо их кусков?
Что еще я должен использовать, чтобы заставить его работать так, как мне хотелось бы?