Выход из Async Generator в Python AsyncIO - PullRequest
0 голосов
/ 15 мая 2018

У меня есть простой класс, который использует асинхронный генератор для получения списка URL:

import aiohttp
import asyncio
import logging
import sys

LOOP = asyncio.get_event_loop()
N_SEMAPHORE = 3

FORMAT = '[%(asctime)s] - %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)

class ASYNC_GENERATOR(object):
    def __init__(self, n_semaphore=N_SEMAPHORE, loop=LOOP):
        self.loop = loop
        self.semaphore = asyncio.Semaphore(n_semaphore)
        self.session = aiohttp.ClientSession(loop=self.loop)

    async def _get_url(self, url):
        """
        Sends an http GET request to an API endpoint
        """

        async with self.semaphore:
            async with self.session.get(url) as response:
                logger.info(f'Request URL: {url} [{response.status}]')
                read_response = await response.read()

                return {
                    'read': read_response,
                    'status': response.status,
                }

    def get_routes(self, urls):
        """
        Wrapper around _get_url (multiple urls asynchronously)

        This returns an async generator
        """

        # Asynchronous http GET requests
        coros = [self._get_url(url) for url in urls]
        futures = asyncio.as_completed(coros)
        for future in futures:
            yield self.loop.run_until_complete(future)

    def close(self):
        self.session._connector.close()

Когда я выполняю эту основную часть кода:

if __name__ == '__main__':
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    for response in responses:
        response = next(ag.get_routes(['https://httpbin.org/get']))
    ag.close()

Журналвыводит на печать:

[2018-05-15 12:59:49,228] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 12:59:49,235] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 12:59:49,242] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 12:59:49,285] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 12:59:49,290] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 12:59:49,295] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 12:59:49,335] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 12:59:49,340] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 12:59:49,347] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 12:59:49,388] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 12:59:49,394] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,444] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,503] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,553] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,603] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,650] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,700] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,825] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,875] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,922] - Request URL: https://httpbin.org/get [200]

Поскольку responses является асинхронным генератором, я ожидаю, что он выдаст один ответ от асинхронного генератора (который должен отправлять запрос только при фактической выдаче), отправив отдельный запросконечная точка без параметра x, а затем выдает следующий ответ от асинхронного генератора.Это должно переключаться между запросом с параметром x и запросом без параметров.Вместо этого он выдает все ответы от асинхронного генератора с параметром x, а затем следуют все запросы https, которые не имеют параметров.

Когда я делаю, происходит нечто подобное:

ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
next(responses)
response = next(ag.get_routes(['https://httpbin.org/get']))
ag.close()

И журнал печатает:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,656] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 13:08:38,681] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 13:08:38,695] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 13:08:38,717] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 13:08:38,741] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 13:08:38,750] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 13:08:38,773] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 13:08:38,792] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 13:08:38,803] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

Вместо этого я хочу:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

Бывают моменты, когда я хочу сначала получить все ответы, прежде чем делать что-либо еще.,Однако бывают также случаи, когда я хочу вставлять и делать промежуточные запросы перед выдачей следующего элемента из генератора (т. Е. Генератор возвращает результаты из разбитых на страницы результатов поиска, и я хочу обработать дальнейшие ссылки с каждой страницы перед переходом на следующую страницу).).

Что мне нужно изменить для достижения необходимого результата?

1 Ответ

0 голосов
/ 15 мая 2018

Оставляя в стороне технический вопрос о том, является ли responses асинхронным генератором (это не так, поскольку Python использует термин ), ваша проблема заключается в as_completed.as_completed запускает группу сопрограмм параллельно и предоставляет средства для получения их результатов по завершении.То, что фьючерсы выполняются параллельно, не совсем очевидно из документации , но имеет смысл, если учесть, что исходный concurrent.futures.as_completed работает с фьючерсами на основе потоков, у которых нет выбора но для параллельной работы.Концептуально, то же самое относится и к асинхронному будущему.

Ваш код получает только первый (самый быстрый) результат, а затем начинает делать что-то еще, также используя asyncio.Остальные сопрограммы, переданные в as_completed, не замораживаются просто потому, что никто не собирает их результаты - они выполняют свою работу в фоновом режиме, и когда они будут готовы, они будут await ed (в вашем случае с помощью кода внутри as_completed, доступ к которому осуществляется с помощью loop.run_until_complete()).Рискну предположить, что для получения URL-адреса без параметров требуется больше времени, чем для URL-адреса только с параметром x, поэтому он печатается после всех других сопрограмм.

Другими словами, эти строки журналанапечатанный означает, что asyncio выполняет свою работу и обеспечивает запрошенное параллельное выполнение!Если вам не нужно параллельное выполнение, не запрашивайте его, выполняйте их последовательно:

def get_routes(self, urls):
    for url in urls:
        yield loop.run_until_complete(self._get_url(url))

Но это плохой способ использования asyncio - его основной цикл не реентерабелен, так чтодля обеспечения возможности компоновки вы почти наверняка захотите, чтобы цикл запускался один раз на верхнем уровне.Обычно это делается с помощью такой конструкции, как loop.run_until_complete(main()) или loop.run_forever().Как отметил Мартин, вы могли бы достичь этого, сохранив API приятного генератора, сделав get_routes фактическим асинхронным генератором:

async def get_routes(self, urls):
    for url in urls:
        result = await self._get_url(url)
        yield result

Теперь вы можете иметь сопрограмму main(), которая выглядит следующим образом:

async def main():
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    async for response in responses:
        # simulate `next` with async iteration
        async for other_response in ag.get_routes(['https://httpbin.org/get']):
            break
    ag.close()

loop.run_until_complete(main())
...