Я не думаю, что KeyError
исключения связаны с тем, как структурирован ваш код.
Чтобы воспроизвести ваши результаты, я высмеял ваши вызовы API, используя asyncio.sleep()
:
import asyncio
class API:
def __init__(self, r):
self.r = r
async def get_some_data(self, i):
await asyncio.sleep(3)
return {'key_{}'.format(i) : 'Data_{}___Row_{}'.format(i, self.r)}
async def get_single_row(api):
tasks = []
tasks.append(asyncio.ensure_future(api.get_some_data(0)))
tasks.append(asyncio.ensure_future(api.get_some_data(1)))
resp = await asyncio.gather(*tasks)
data_0 = resp[0]
data_1 = resp[1]
extra_data_0 = data_0['key_0']
extra_data_1 = data_1['key_1']
return (extra_data_0, extra_data_1)
async def get_rows_working(rows):
data = []
for r in rows:
api = API(r)
data.append(await get_single_row(api))
return data
async def get_rows_not_working(rows):
tasks = []
for r in rows:
api = API(r)
tasks.append(asyncio.ensure_future(get_single_row(api)))
data = await asyncio.gather(*tasks)
return data
Затем добавили таймер и запустили обе функции, чтобы понять, что происходит:
import time
class Timer:
def __enter__(self):
self.start = time.perf_counter()
return self
def __exit__(self, *args):
self.end = time.perf_counter()
self.interval = self.end - self.start
loop = asyncio.get_event_loop()
rows = range(10)
with Timer() as t:
res = loop.run_until_complete(get_rows_working(rows))
print("get_rows_working() result : {}".format(res))
print('get_rows_working() API call took %.03f sec.\n' % t.interval)
with Timer() as t:
res = loop.run_until_complete(get_rows_not_working(rows))
print("get_rows_not_working() result : {}".format(res))
print('get_rows_not_working() API call took %.03f sec.' % t.interval)
Выход:
get_rows_working() result : [('Data_0___Row_0', 'Data_1___Row_0'), ('Data_0___Row_1', 'Data_1___Row_1'), ('Data_0___Row_2', 'Data_1___Row_2'), ('Data_0___Row_3', 'Data_1___Row_3'), ('Data_0___Row_4', 'Data_1___Row_4'), ('Data_0___Row_5', 'Data_1___Row_5'), ('Data_0___Row_6', 'Data_1___Row_6'), ('Data_0___Row_7', 'Data_1___Row_7'), ('Data_0___Row_8', 'Data_1___Row_8'), ('Data_0___Row_9', 'Data_1___Row_9')]
get_rows_working() API call took 30.034 sec.
get_rows_not_working() result : [('Data_0___Row_0', 'Data_1___Row_0'), ('Data_0___Row_1', 'Data_1___Row_1'), ('Data_0___Row_2', 'Data_1___Row_2'), ('Data_0___Row_3', 'Data_1___Row_3'), ('Data_0___Row_4', 'Data_1___Row_4'), ('Data_0___Row_5', 'Data_1___Row_5'), ('Data_0___Row_6', 'Data_1___Row_6'), ('Data_0___Row_7', 'Data_1___Row_7'), ('Data_0___Row_8', 'Data_1___Row_8'), ('Data_0___Row_9', 'Data_1___Row_9')]
get_rows_not_working() API call took 3.008 sec.
Что означает, что второйфункция get_rows_not_working()
фактически работает как положено и одновременно вызывает API.
Возможно ли, что вы получаете KeyError
исключений, потому что API возвращает пустые данные, когда вы превышаете ограничение частоты запросов? Например, если API реализован как:
MAX_CONCUR_ROWS = 5
class API:
connections = 0
def __init__(self, r):
self.r = r
async def get_some_data(self, i):
API.connections += 1
await asyncio.sleep(3)
if API.connections > MAX_CONCUR_ROWS * 2:
res = {}
else:
res = {'key_{}'.format(i) : 'Data_{}___Row_{}'.format(i, self.r)}
API.connections -= 1
return res
, то get_rows_not_working()
вернет KeyError: 'key_0'
, тогда как get_rows_working()
работает нормально.
Если это так, то вы можете ограничить свои запросы, пакетируя их или используя asyncio.Semaphore
:
async def get_single_row(api, semaphore):
# using tasks instead of coroutines won't work because asyncio.ensure_future() starts the coroutine, so the semaphore won't have any effect.
coros = []
coros.append(api.get_some_data(0))
coros.append(api.get_some_data(1))
async with semaphore:
resp = await asyncio.gather(*coros)
data_0 = resp[0]
data_1 = resp[1]
extra_data_0 = data_0['key_0']
extra_data_1 = data_1['key_1']
return (extra_data_0, extra_data_1)
async def get_rows_not_working(rows):
semaphore = asyncio.Semaphore(MAX_CONCUR_ROWS)
tasks = []
for r in rows:
api = API(r)
tasks.append(asyncio.ensure_future(get_single_row_coros(api, semaphore)))
data = await asyncio.gather(*tasks)
return data
Приведенный выше код не 't выполняет более 5 одновременных вызовов одновременно и возвращает ожидаемый результат (обратите внимание, что теперь вместо 3 требуется 6 секунд):
get_rows_not_working() result : [('Data_0___Row_0', 'Data_1___Row_0'), ('Data_0___Row_1', 'Data_1___Row_1'), ('Data_0___Row_2', 'Data_1___Row_2'), ('Data_0___Row_3', 'Data_1___Row_3'), ('Data_0___Row_4', 'Data_1___Row_4'), ('Data_0___Row_5', 'Data_1___Row_5'), ('Data_0___Row_6', 'Data_1___Row_6'), ('Data_0___Row_7', 'Data_1___Row_7'), ('Data_0___Row_8', 'Data_1___Row_8'), ('Data_0___Row_9', 'Data_1___Row_9')]
get_rows_not_working() API call took 6.013 sec.