Я уточню то, что я прокомментировал. Так что вы можете попробовать разработать собственное решение (хотя я приведу здесь полный код).
У вас может быть словарь, определяющий некоторые правила (api -> ограничение скорости в секунду):
APIS_RATE_LIMIT_PER_S = {
"http://api.mathjs.org/v4?precision=5": 1,
"http://api.mathjs.org/v4?precision=2": 3,
}
Который затем можно использовать, чтобы решить, какой семафор выбрать в соответствии с URL-адресом запроса ( на практике вам придется выполнить некоторый синтаксический анализ, чтобы получить конечные точки, которыми вы хотите управлять). Как только вы это сделаете, вам просто нужно использовать семафор, чтобы убедиться, что вы ограничиваете количество одновременных процессов, выполняющих ваш запрос. Последний кусок головоломки, очевидно, - добавить задержку перед выпуском семафора.
Я получу другую версию того, что предлагается здесь , но это в основном то же самое решение. Я только что сделал так, чтобы вы могли изменить объект сеанса, чтобы каждый вызов session.get
автоматически применял контроль ограничения скорости.
def set_rate_limits(session, apis_rate_limits_per_s):
semaphores = {api: asyncio.Semaphore(s) for api, s in apis_rate_limits_per_s.items()}
@asynccontextmanager
async def limit_rate(url):
await semaphores[url].acquire()
start = time.time()
try:
yield semaphores[url]
finally:
duration = time.time() - start
await asyncio.sleep(1 - duration)
semaphores[url].release()
def add_limit_rate(coroutine):
async def coroutine_with_rate_limit(url, *args, **kwargs):
async with limit_rate(url):
return await coroutine(url, *args, **kwargs)
return coroutine_with_rate_limit
session.get = add_limit_rate(session.get)
session.post = add_limit_rate(session.post)
return session
Обратите внимание, что с помощью add_limit_rate
вы можете добавить контроль ограничения скорости к любой сопрограмме, у которой конечная точка API является первым аргументом. Но здесь мы просто изменим session.get
и session.post
.
В конце вы можете использовать функцию set_rate_limits
следующим образом:
async def main():
apis = APIS_RATE_LIMIT_PER_S.keys()
params = [
{"expr" : "2^2"},
{"expr" : "1/0.999"},
{"expr" : "1/1.001"},
{"expr" : "1*1.001"},
]
async with aiohttp.ClientSession() as session:
session = set_rate_limits(session, APIS_RATE_LIMIT_PER_S)
api_requests = [get_text_result(session, url, params=p) for url, p in product(apis, params)]
text_responses = await asyncio.gather(*api_requests)
print(text_responses)
async def get_text_result(session, url, params=None):
result = await session.get(url, params=params)
return await result.text()
Если вы запустите этот код, вы не будете Чтобы увидеть большую часть того, что происходит, вы можете добавить некоторые print
здесь и там в set_rate_limits
, чтобы «убедиться», что ограничение скорости применяется правильно:
import time
# [...] change this part :
def add_limit_rate(coroutine):
async def coroutine_with_rate_limit(url, *args, **kwargs):
async with limit_rate(url):
######### debug
global request_count
request_count += 1
this_req_id = request_count
rate_lim = APIS_RATE_LIMIT_PER_S[url]
print(f"request #{this_req_id} -> \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
########
r = await coroutine(url, *args, **kwargs)
######### debug
print(f"request #{this_req_id} <- \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
#########
return r
Если вы запустите этот пример asyncio.run(main())
, вы должны получить что-то вроде:
request #1 -> 1ms rate 1/s
request #2 -> 2ms rate 3/s
request #3 -> 3ms rate 3/s
request #4 -> 3ms rate 3/s
request #1 <- 1003ms rate 1/s
request #2 <- 1004ms rate 3/s
request #3 <- 1004ms rate 3/s
request #5 -> 1004ms rate 1/s
request #6 -> 1005ms rate 3/s
request #4 <- 1006ms rate 3/s
request #5 <- 2007ms rate 1/s
request #6 <- 2007ms rate 3/s
request #7 -> 2007ms rate 1/s
request #7 <- 3008ms rate 1/s
request #8 -> 3008ms rate 1/s
request #8 <- 4010ms rate 1/s
Кажется, здесь соблюдается ограничение скорости, в частности, мы можем взглянуть на API с лимитом скорости 1 запрос в секунду:
request #1 -> 1ms rate 1/s
request #1 <- 1003ms rate 1/s
request #5 -> 1004ms rate 1/s
request #5 <- 2007ms rate 1/s
request #7 -> 2007ms rate 1/s
request #7 <- 3008ms rate 1/s
request #8 -> 3008ms rate 1/s
request #8 <- 4010ms rate 1/s
С другой стороны, это решение не очень удовлетворительно, поскольку мы искусственно добавляем пинг в 1 секунду ко всем нашим запросам. Это из-за этой части кода:
await asyncio.sleep(1 - duration)
semaphores[url].release()
Проблема здесь в том, что мы ждем, пока сон завершится sh, прежде чем передать управление обратно событию l oop (планирование другого задача, другой запрос). Это можно легко решить, используя вместо этого этот фрагмент кода:
asyncio.create_task(release_after_delay(semaphores[url], 1 - duration))
С release_after_delay
просто:
async def release_after_delay(semaphore, delay):
await asyncio.sleep(delay)
semaphore.release()
Функция asyncio.create_task
делает сопрограмма "запустить это в фоновом режиме". Это означает, что в этом коде семафор будет выпущен позже, но нам не нужно ждать, пока он вернет управление четному l oop (что означает, что можно запланировать другой запрос, а также что мы можем получить результат в add_limit_rate
). Другими словами, нас не волнует результат этой сопрограммы, мы просто хотим, чтобы она запускалась в какой-то момент в будущем (вероятно, поэтому эта функция раньше вызывала ensure_future
).
Используя этот патч, мы имеем следующее для API с ограничением скорости, установленным на один запрос в секунду:
request #1 -> 1ms rate 1/s
request #1 <- 214ms rate 1/s
request #2 -> 1002ms rate 1/s
request #2 <- 1039ms rate 1/s
request #3 -> 2004ms rate 1/s
request #3 <- 2050ms rate 1/s
request #4 -> 3009ms rate 1/s
request #4 <- 3048ms rate 1/s
Это определенно ближе к тому, что мы ожидаем от этого кода. Мы получаем каждый ответ от нашего API, как только можем (в этом примере пинг составляет 200 мс / 37 мс / 46 мс / 41 мс). И ограничение скорости тоже соблюдается.
Это, наверное, не самый красивый код, но он может стать началом для вашей работы. Может быть, сделайте чистый пакет с этим, как только он у вас будет хорошо работать, я думаю, это то, что другие люди могут захотеть использовать.