Я хотел бы запустить большое количество HTTP-запросов и собрать их результаты, как только все они вернутся.Отправка запросов неблокирующим способом возможна с asyncio
, но у меня возникают проблемы со сбором их результатов.
Мне известны такие решения, как aiohttp , которые сделаны для этого.конкретная проблема.Но HTTP-запросы являются лишь примером, мой вопрос заключается в том, как правильно использовать asyncio
.
На стороне сервера у меня есть фляга, которая отвечает на каждый запрос localhost/
с помощью «Hello World!»,но он ждет 0,1 секунды, прежде чем ответить.Во всех моих примерах я отправляю 10 запросов.Синхронный код должен занимать около 1 секунды, асинхронная версия может сделать это за 0,1 секунды.
На стороне клиента я хочу ускорить много запросов одновременно и собрать их результаты.Я пытаюсь сделать это тремя разными способами.Поскольку asyncio нужен исполнитель, чтобы обойти блокировку кода, все подходы вызывают loop.run_in_executor
.
Этот код разделен между ними:
import requests
from time import perf_counter
import asyncio
loop = asyncio.get_event_loop()
async def request_async():
r = requests.get("http://127.0.0.1:5000/")
return r.text
def request_sync():
r = requests.get("http://127.0.0.1:5000/")
return r.text
Подход 1:
Используйте asyncio.gather()
в списке задач, а затем run_until_complete
.После прочтения Asyncio.gather vs asyncio.wait казалось, что сборщик будет ждать результатов.Но это не так.Таким образом, этот код возвращается мгновенно, не дожидаясь завершения запросов.Если я использую здесь функцию блокировки, это работает.Почему я не могу использовать асинхронную функцию?
# approach 1
start = perf_counter()
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_async)) # <---- using async function !
gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.003
# approach 1(B)
start = perf_counter()
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_sync)) # <---- using sync function
gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.112
Python даже предупреждает меня, что coroutine "request_async"
никогда не ожидалось.На данный момент у меня есть рабочее решение: использование обычной (не асинхронной) функции в исполнителе.Но я хотел бы иметь решение, которое работает с async
определениями функций.Потому что я хотел бы использовать await
внутри них (в этом простом примере это не нужно, но если я переместлю больше кода в asyncio
, я уверен, что это станет важным).
Подход 2:
Python предупреждает меня, что мои сопрограммы никогда не ожидаются.Итак, давайте подождем их.Подход 2 оборачивает весь код во внешнюю асинхронную функцию и ожидает результата от сбора.Та же проблема, также мгновенно возвращается (тоже самое предупреждение):
# approach 2
async def main():
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_async))
gathered_tasks = asyncio.gather(*tasks)
return await gathered_tasks # <-------- here I'm waiting on the coroutine
start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}") # 0.0036
Это действительно смутило меня.Я жду на результат gather
.Интуитивно это должно быть распространено на сопрограммы, которые я собираю.Но python все еще жалуется, что моя сопрограмма никогда не ожидается.
Я прочитал еще немного и обнаружил: Как я могу использовать запросы в asyncio?
Это в значительной степени точно мойпример: объединение requests
и asyncio
.Что подводит меня к подходу 3:
Подход 3:
Та же структура, что и для подхода 2, но подождите каждую задачу, которая была дана run_in_executor()
индивидуально (наверняка этосчитается ожидающим сопрограммы):
# approach 3:
# wrapping executor in coroutine
# awaiting every task individually
async def main():
tasks = []
for i in range(10):
task = loop.run_in_executor(None, request_async)
tasks.append(task)
responses = []
for task in tasks:
response = await task
responses.append(response)
return responses
start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}") # 0.004578
Мой вопрос таков: я хочу иметь блокирующий код в моих сопрограммах и запускать их параллельно с исполнителем.Как мне получить их результаты?