«Пауза», которую вы ищете, это именно то, что делает await
.Он блокирует текущую сопрограмму (позволяя другим делать успехи), пока та, которую он ожидает, не завершится.Ваш код в основном правильный, за исключением использования async with
вокруг open
.Вот более полная версия (не проверенная):
import asyncio, aiohttp
from itertools import islice
BATCH_SIZE = 1000 # start with something reasonable
async def main():
async with aiohttp.ClientSession() as session:
with open("test.txt") as f:
while True:
# take the next BATCH_SIZE lines
batch = [line.strip() for line in islice(f, BATCH_SIZE)]
if not batch:
# no more lines - we're done
break
await get_req_fn(batch, session)
При реализации get_req_fn
необходимо позаботиться о том, чтобы включить параллельное выполнение, а также дождаться завершения всего пакета.Ключевым компонентом для этого являются комбинаторы сопрограмм, функции, которые объединяют несколько сопрограмм в один ожидаемый объект.Мощный и очень простой в использовании gather
:
async def get_req_fn(urls, session):
coros = []
for url in urls:
coros.append(single_req(url, session))
await asyncio.gather(*coros)
gather
запускает заданные сопрограммы и, когда await
ed, блокирует текущую, пока всеиз них завершено.Это позволяет await get_req_fn(batch, session)
приостанавливать чтение до тех пор, пока не будет загружен весь пакет.
Наконец, single_req
может выглядеть следующим образом:
async def single_req(url, session):
try:
async with session.get(url) as resp:
text = await resp.text()
# process text, or save it to a file, etc
except IOError as e:
print(f'error fetching {url}: {e}')
Все функции принимают объект сеанса создан в main()
, потому что создание новой сессии для каждого запроса настоятельно не рекомендуется в документации.
Наконец, чтобы запустить все это, используйте что-то вроде:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())