asyncio: сбор результатов от асинхронной функции в исполнителе - PullRequest
0 голосов
/ 12 ноября 2018

Я хотел бы запустить большое количество HTTP-запросов и собрать их результаты, как только все они вернутся.Отправка запросов неблокирующим способом возможна с asyncio, но у меня возникают проблемы со сбором их результатов.

Мне известны такие решения, как aiohttp , которые сделаны для этого.конкретная проблема.Но HTTP-запросы являются лишь примером, мой вопрос заключается в том, как правильно использовать asyncio.

На стороне сервера у меня есть фляга, которая отвечает на каждый запрос localhost/ с помощью «Hello World!»,но он ждет 0,1 секунды, прежде чем ответить.Во всех моих примерах я отправляю 10 запросов.Синхронный код должен занимать около 1 секунды, асинхронная версия может сделать это за 0,1 секунды.

На стороне клиента я хочу ускорить много запросов одновременно и собрать их результаты.Я пытаюсь сделать это тремя разными способами.Поскольку asyncio нужен исполнитель, чтобы обойти блокировку кода, все подходы вызывают loop.run_in_executor.

Этот код разделен между ними:

import requests
from time import perf_counter
import asyncio

loop = asyncio.get_event_loop()

async def request_async():
    r = requests.get("http://127.0.0.1:5000/")
    return r.text

def request_sync():
    r = requests.get("http://127.0.0.1:5000/")
    return r.text

Подход 1:

Используйте asyncio.gather() в списке задач, а затем run_until_complete.После прочтения Asyncio.gather vs asyncio.wait казалось, что сборщик будет ждать результатов.Но это не так.Таким образом, этот код возвращается мгновенно, не дожидаясь завершения запросов.Если я использую здесь функцию блокировки, это работает.Почему я не могу использовать асинхронную функцию?

# approach 1
start = perf_counter()
tasks = []
for i in range(10):
    tasks.append(loop.run_in_executor(None, request_async)) # <---- using async function !

gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.003

# approach 1(B)
start = perf_counter()
tasks = []
for i in range(10):
    tasks.append(loop.run_in_executor(None, request_sync)) # <---- using sync function

gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)

stop = perf_counter()
print(f"finished {stop - start}") # 0.112

Python даже предупреждает меня, что coroutine "request_async" никогда не ожидалось.На данный момент у меня есть рабочее решение: использование обычной (не асинхронной) функции в исполнителе.Но я хотел бы иметь решение, которое работает с async определениями функций.Потому что я хотел бы использовать await внутри них (в этом простом примере это не нужно, но если я переместлю больше кода в asyncio, я уверен, что это станет важным).

Подход 2:

Python предупреждает меня, что мои сопрограммы никогда не ожидаются.Итак, давайте подождем их.Подход 2 оборачивает весь код во внешнюю асинхронную функцию и ожидает результата от сбора.Та же проблема, также мгновенно возвращается (тоже самое предупреждение):

# approach 2
async def main():

    tasks = []
    for i in range(10):
        tasks.append(loop.run_in_executor(None, request_async))

    gathered_tasks = asyncio.gather(*tasks)

    return await gathered_tasks # <-------- here I'm waiting on the coroutine 

start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}")  # 0.0036

Это действительно смутило меня.Я жду на результат gather.Интуитивно это должно быть распространено на сопрограммы, которые я собираю.Но python все еще жалуется, что моя сопрограмма никогда не ожидается.

Я прочитал еще немного и обнаружил: Как я могу использовать запросы в asyncio?

Это в значительной степени точно мойпример: объединение requests и asyncio.Что подводит меня к подходу 3:

Подход 3:

Та же структура, что и для подхода 2, но подождите каждую задачу, которая была дана run_in_executor() индивидуально (наверняка этосчитается ожидающим сопрограммы):

# approach 3:
# wrapping executor in coroutine
# awaiting every task individually
async def main():

    tasks = []
    for i in range(10):
        task = loop.run_in_executor(None, request_async)
        tasks.append(task)

    responses = []
    for task in tasks:
        response = await task
        responses.append(response)

    return responses

start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()

print(f"finished {stop - start}") # 0.004578

Мой вопрос таков: я хочу иметь блокирующий код в моих сопрограммах и запускать их параллельно с исполнителем.Как мне получить их результаты?

1 Ответ

0 голосов
/ 12 ноября 2018

У меня такой вопрос: я хочу, чтобы в моих сопрограммах был блокирующий код, и я выполнял их параллельно с исполнителем. Как мне получить их результаты?

Ответ в том, что вы не должны иметь код блокировки в своих сопрограммах. Если он у вас есть, вы должны выделить его, используя run_in_executor. Таким образом, единственный правильный способ написать request_async (используя requests):

async def request_async():
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, request_sync)

Предоставление request_async на run_in_executor обречено, потому что вся точка из run_in_executor предназначена для вызова функции sync в другом потоке. Если вы дадите ей функцию сопрограммы, она с радостью вызовет ее (в другом потоке) и предоставит возвращенный объект сопрограммы как «результат». Это эквивалентно передаче генератора в код, который ожидает обычную функцию - да, он будет просто вызывать генератор, но не будет знать, что делать с возвращенным объектом.

Что еще более важно, вы не можете просто поставить async перед def и ожидать получить полезную сопрограмму. Сопрограмма не должна блокироваться, кроме как в ожидании другого асинхронного кода.

Теперь, когда у вас есть пригодный для использования request_async, вы можете собирать его результаты следующим образом:

async def main():
    tasks = [request_async() for _i in range(10)]
    results = await asyncio.gather(*tasks)
    return results

results = loop.run_until_complete(main())
...