Многократное ожидание в асинхронной функции Python - PullRequest
0 голосов
/ 09 мая 2018

Я использую сеанс aiohttp вместе с семафором в пользовательском классе:

async def get_url(self, url):

    async with self.semaphore:
        async with self.session.get(url) as response:
            try:
                text_response = await response.text()
                read_response = await response.read()
                json_response = await response.json()
                await asyncio.sleep(random.uniform(0.1, 0.5))
            except aiohttp.client_exceptions.ContentTypeError:
                json_response = {}

            return {
                'json': json_response,
                'text': text_response,
                'read': read_response,
                'status': response.status,
                'url': response.url,
            }

У меня два вопроса:

  1. Правильно / неправильно ли иметь несколько операторов ожидания в одной асинхронной функции? Мне нужно вернуть и response.text () и response.read (). Однако, в зависимости от URL, response.json () может быть или не быть доступным, поэтому я бросил все в блок try / Кроме того, чтобы поймать это исключение.

  2. Поскольку я использую эту функцию для циклического прохождения по списку различных конечных точек API RESTful, я контролирую количество одновременных запросов через семафор (не более 100), но мне также нужно разбивать запросы так, чтобы они не регистрируют глушение хост-машины. Итак, я думал, что смогу сделать это, добавив asyncio.sleep, который выбирается случайным образом в интервале 0,1-0,5 секунды. Это лучший способ обеспечить небольшое ожидание между запросами? Должен ли я переместить это в начало функции, а не в конце?

1 Ответ

0 голосов
/ 09 мая 2018
  1. Совершенно нормально иметь несколько ожидающих в одной асинхронной функции, если вы знаете, чего ожидаете, и каждый из них ожидает по очереди, как обычное последовательное выполнение. В aiohttp следует упомянуть одну вещь: вам лучше сначала позвонить read() и тоже поймать UnicodeDecodeError, поскольку внутренне text() и json() сначала вызывают read() и обрабатывают его результат, вам не нужен обработка для предотвращения возврата по крайней мере read_response. Вам не нужно беспокоиться о том, что read() вызывается несколько раз, он просто кэшируется в экземпляре ответа при первом вызове.

  2. Random Stagger - простое и эффективное решение для внезапного трафика. Однако если вы хотите точно контролировать минимальный интервал времени между любыми двумя запросами - по академическим причинам вы можете установить два семафора:

    def __init__(self):
        # something else
        self.starter = asyncio.Semaphore(0)
        self.ender = asyncio.Semaphore(30)
    

    Затем измените get_url(), чтобы использовать их:

    async def get_url(self, url):
        await self.starter.acquire()
        try:
            async with self.session.get(url) as response:
                # your code
        finally:
            self.ender.release()
    

    Поскольку starter был инициализирован с нуля, все сопрограммы get_url() будут блокироваться на starter. Мы будем использовать отдельную сопрограмму для управления ею:

    async def controller(self):
        last = 0
        while self.running:
            await self.ender.acquire()
            sleep = 0.5 - (self.loop.time() - last)  # at most 2 requests per second
            if sleep > 0:
                await asyncio.sleep(sleep)
            last = self.loop.time()
            self.starter.release()
    

    И ваша основная программа должна выглядеть примерно так:

    def run(self):
        for url in [...]:
            self.loop.create_task(self.get_url(url))
        self.loop.create_task(self.controller())
    

    Итак, сначала контроллер отпустит starter 30 раз равномерно за 15 секунд, потому что это начальное значение ender. После этого контроллер выпустит starter, как только закончится любая get_url(), если с момента последнего выпуска starter прошло 0,5 секунды, или он будет ждать до этого времени.

    Одна проблема здесь: если URL-адреса для извлечения не являются постоянным списком в памяти (например, поступают из сети постоянно с непредсказуемыми задержками между URL-адресами), ограничитель RPS выйдет из строя (стартер выпущен слишком рано, прежде чем на самом деле будет URL для извлечения) ). В этом случае вам понадобятся дополнительные настройки, хотя вероятность взрыва трафика уже очень мала.

...