aiohttp: ограничение скорости запросов в секунду по типу домена - PullRequest
1 голос
/ 06 мая 2020

Я уже смотрел здесь . Но я все еще не могу об этом подумать. Вот как я сейчас это делаю:

urls_without_rate_limit = 
    [
       'http://httpbin.org/get'
       'http://httpbin.org/get',
       'http://httpbin.org/get',
       'http://httpbin.org/get',
       'http://httpbin.org/get'
    ]

urls_with_rate_limit = 
    [
       'http://eu.httpbin.org/get'
       'http://eu.httpbin.org/get',
       'http://eu.httpbin.org/get',
       'http://eu.httpbin.org/get',
       'http://eu.httpbin.org/get'
    ]

api_rate = 2
api_limit = 6

loop = asyncio.get_event_loop()
    loop.run_until_complete(
        process(urls=urls_without_rate_limit, rate=0, limit=len(url_list)))

    loop.run_until_complete(
        process(urls=urls_with_rate_limit, rate=api_rate, limit=api_limit))
async def process(urls, rate, limit):
    limit = asyncio.Semaphore(limit)

    f = Fetch(
        rate=rate,
        limit=limit
    )

    tasks = []
    for url in urls:
        tasks.append(f.make_request(url=url))

    results = await asyncio.gather(*tasks)

Как вы можете видеть, он завершит sh первый раунд process, а затем начнет второй раунд для ограничения скорости.

Работает нормально, но есть ли способ запустить оба раунда одновременно с разными ограничениями скорости?

твм

1 Ответ

1 голос
/ 08 мая 2020

Я уточню то, что я прокомментировал. Так что вы можете попробовать разработать собственное решение (хотя я приведу здесь полный код).

У вас может быть словарь, определяющий некоторые правила (api -> ограничение скорости в секунду):

APIS_RATE_LIMIT_PER_S = {
  "http://api.mathjs.org/v4?precision=5": 1,
  "http://api.mathjs.org/v4?precision=2": 3,
}

Который затем можно использовать, чтобы решить, какой семафор выбрать в соответствии с URL-адресом запроса ( на практике вам придется выполнить некоторый синтаксический анализ, чтобы получить конечные точки, которыми вы хотите управлять). Как только вы это сделаете, вам просто нужно использовать семафор, чтобы убедиться, что вы ограничиваете количество одновременных процессов, выполняющих ваш запрос. Последний кусок головоломки, очевидно, - добавить задержку перед выпуском семафора.

Я получу другую версию того, что предлагается здесь , но это в основном то же самое решение. Я только что сделал так, чтобы вы могли изменить объект сеанса, чтобы каждый вызов session.get автоматически применял контроль ограничения скорости.

def set_rate_limits(session, apis_rate_limits_per_s):
    semaphores = {api: asyncio.Semaphore(s) for api, s in apis_rate_limits_per_s.items()}

    @asynccontextmanager
    async def limit_rate(url):
        await semaphores[url].acquire() 
        start = time.time()
        try:
            yield semaphores[url]
        finally:
            duration = time.time() - start
            await asyncio.sleep(1 - duration)
            semaphores[url].release()

    def add_limit_rate(coroutine):

        async def coroutine_with_rate_limit(url, *args, **kwargs):
            async with limit_rate(url):
                return await coroutine(url, *args, **kwargs)

        return coroutine_with_rate_limit

    session.get = add_limit_rate(session.get)
    session.post = add_limit_rate(session.post)
    return session

Обратите внимание, что с помощью add_limit_rate вы можете добавить контроль ограничения скорости к любой сопрограмме, у которой конечная точка API является первым аргументом. Но здесь мы просто изменим session.get и session.post.

В конце вы можете использовать функцию set_rate_limits следующим образом:

async def main():
    apis = APIS_RATE_LIMIT_PER_S.keys()
    params = [
        {"expr" : "2^2"},
        {"expr" : "1/0.999"},
        {"expr" : "1/1.001"},
        {"expr" : "1*1.001"},
    ]
    async with aiohttp.ClientSession() as session:
        session = set_rate_limits(session, APIS_RATE_LIMIT_PER_S)
        api_requests = [get_text_result(session, url, params=p) for url, p  in product(apis, params)]
        text_responses = await asyncio.gather(*api_requests)
        print(text_responses)


async def get_text_result(session, url, params=None):
    result = await session.get(url, params=params)
    return await result.text()

Если вы запустите этот код, вы не будете Чтобы увидеть большую часть того, что происходит, вы можете добавить некоторые print здесь и там в set_rate_limits, чтобы «убедиться», что ограничение скорости применяется правильно:

import time

# [...] change this part : 
    def add_limit_rate(coroutine):

        async def coroutine_with_rate_limit(url, *args, **kwargs):
            async with limit_rate(url):
                ######### debug 
                global request_count
                request_count += 1
                this_req_id = request_count
                rate_lim = APIS_RATE_LIMIT_PER_S[url]
                print(f"request #{this_req_id} -> \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
                ########
                r = await coroutine(url, *args, **kwargs)

            ######### debug 
            print(f"request #{this_req_id} <- \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
            ######### 
            return r

Если вы запустите этот пример asyncio.run(main()) , вы должны получить что-то вроде:

request #1 ->        1ms     rate 1/s
request #2 ->        2ms     rate 3/s
request #3 ->        3ms     rate 3/s
request #4 ->        3ms     rate 3/s
request #1 <-     1003ms     rate 1/s
request #2 <-     1004ms     rate 3/s
request #3 <-     1004ms     rate 3/s
request #5 ->     1004ms     rate 1/s
request #6 ->     1005ms     rate 3/s
request #4 <-     1006ms     rate 3/s
request #5 <-     2007ms     rate 1/s
request #6 <-     2007ms     rate 3/s
request #7 ->     2007ms     rate 1/s
request #7 <-     3008ms     rate 1/s
request #8 ->     3008ms     rate 1/s
request #8 <-     4010ms     rate 1/s

Кажется, здесь соблюдается ограничение скорости, в частности, мы можем взглянуть на API с лимитом скорости 1 запрос в секунду:

request #1 ->        1ms     rate 1/s
request #1 <-     1003ms     rate 1/s
request #5 ->     1004ms     rate 1/s
request #5 <-     2007ms     rate 1/s
request #7 ->     2007ms     rate 1/s
request #7 <-     3008ms     rate 1/s
request #8 ->     3008ms     rate 1/s
request #8 <-     4010ms     rate 1/s

С другой стороны, это решение не очень удовлетворительно, поскольку мы искусственно добавляем пинг в 1 секунду ко всем нашим запросам. Это из-за этой части кода:

await asyncio.sleep(1 - duration)
semaphores[url].release()

Проблема здесь в том, что мы ждем, пока сон завершится sh, прежде чем передать управление обратно событию l oop (планирование другого задача, другой запрос). Это можно легко решить, используя вместо этого этот фрагмент кода:

asyncio.create_task(release_after_delay(semaphores[url], 1 - duration))    

С release_after_delay просто:

async def release_after_delay(semaphore, delay):
    await asyncio.sleep(delay)
    semaphore.release()

Функция asyncio.create_task делает сопрограмма "запустить это в фоновом режиме". Это означает, что в этом коде семафор будет выпущен позже, но нам не нужно ждать, пока он вернет управление четному l oop (что означает, что можно запланировать другой запрос, а также что мы можем получить результат в add_limit_rate). Другими словами, нас не волнует результат этой сопрограммы, мы просто хотим, чтобы она запускалась в какой-то момент в будущем (вероятно, поэтому эта функция раньше вызывала ensure_future).

Используя этот патч, мы имеем следующее для API с ограничением скорости, установленным на один запрос в секунду:

request #1 ->        1ms     rate 1/s
request #1 <-      214ms     rate 1/s
request #2 ->     1002ms     rate 1/s
request #2 <-     1039ms     rate 1/s
request #3 ->     2004ms     rate 1/s
request #3 <-     2050ms     rate 1/s
request #4 ->     3009ms     rate 1/s
request #4 <-     3048ms     rate 1/s

Это определенно ближе к тому, что мы ожидаем от этого кода. Мы получаем каждый ответ от нашего API, как только можем (в этом примере пинг составляет 200 мс / 37 мс / 46 мс / 41 мс). И ограничение скорости тоже соблюдается.

Это, наверное, не самый красивый код, но он может стать началом для вашей работы. Может быть, сделайте чистый пакет с этим, как только он у вас будет хорошо работать, я думаю, это то, что другие люди могут захотеть использовать.

...