как сделать асинхронные вызовы API, вложенные в sure_future и собранные в Python Asyncio? - PullRequest
0 голосов
/ 24 октября 2019

Я пытаюсь сделать асинхронные вызовы API, вложенные с помощью asyncio, с помощью sure_future () и collect ().

Я пробовал два способа заставить это работать. Прежде всего API написан с помощью aiohttp и работает нормально. Я пробовал это с двумя методами (здесь с именем get_rows_working () и get_rows_not_working ()). Один работает, а другой нет.

Одна строка всегда выполняет свои вызовы API параллельно для увеличения скорости. Теперь я пытаюсь вытащить все строки параллельно.

async def get_single_row(api):
    tasks = []
    tasks.append(asyncio.ensure_future(api.get_some_data())
    tasks.append(asyncio.ensure_future(api.get_some_data2())
    resp = await asyncio.gather(*tasks)
    data = resp[0]
    data2 = resp[1]


    extra_data = data['some_key']
    extra_data2 = data2['some_key2']
    return (extra_data, extra_data2)

async def get_rows_working(rows):
    data = []
    for r in rows:
        api = API(r)
        data.append(await get_single_row(api))
    return data

async def get_rows_not_working(rows):
    tasks = []
    for r in rows:
        api = API(r)
        tasks.append(asyncio.ensure_future(get_single_row(api)))
    data = await asyncio.gather(*tasks)
    return data


loop = asyncio.get_event_loop()
loop.run_until_complete(get_rows_working())
loop.run_until_complete(get_rows_not_working())

Что произойдет, если вы начнете их вкладывать? Потому что я начинаю получать KeyErrors в этих строках (чего у меня нет с get_rows_working ()):

extra_data = data['some_key']
extra_data2 = data2['some_key2']

Что заставляет меня верить, что внутренний порядок операций становится все более проблематичным из-за их вложенности. Я не уверен, как это лучше описать, извините.

Это даже правильный способ добиться этого? Спасибо за любые ответы.

1 Ответ

0 голосов
/ 01 ноября 2019

Я не думаю, что KeyError исключения связаны с тем, как структурирован ваш код.

Чтобы воспроизвести ваши результаты, я высмеял ваши вызовы API, используя asyncio.sleep():

import asyncio

class API:
    def __init__(self, r):
        self.r = r

    async def get_some_data(self, i):
        await asyncio.sleep(3)
        return {'key_{}'.format(i) : 'Data_{}___Row_{}'.format(i, self.r)}

async def get_single_row(api):
    tasks = []
    tasks.append(asyncio.ensure_future(api.get_some_data(0)))
    tasks.append(asyncio.ensure_future(api.get_some_data(1)))

    resp = await asyncio.gather(*tasks)
    data_0 = resp[0]
    data_1 = resp[1]

    extra_data_0 = data_0['key_0']
    extra_data_1 = data_1['key_1']
    return (extra_data_0, extra_data_1)

async def get_rows_working(rows):
    data = []
    for r in rows:
        api = API(r)
        data.append(await get_single_row(api))
    return data

async def get_rows_not_working(rows):
    tasks = []
    for r in rows:
        api = API(r)
        tasks.append(asyncio.ensure_future(get_single_row(api)))
    data = await asyncio.gather(*tasks)
    return data

Затем добавили таймер и запустили обе функции, чтобы понять, что происходит:

import time
class Timer:    
    def __enter__(self):
        self.start = time.perf_counter()
        return self

    def __exit__(self, *args):
        self.end = time.perf_counter()
        self.interval = self.end - self.start

loop = asyncio.get_event_loop()
rows = range(10)

with Timer() as t:
    res = loop.run_until_complete(get_rows_working(rows))
    print("get_rows_working() result : {}".format(res))
print('get_rows_working() API call took %.03f sec.\n' % t.interval)

with Timer() as t:
    res = loop.run_until_complete(get_rows_not_working(rows))
    print("get_rows_not_working() result : {}".format(res))
print('get_rows_not_working() API call took %.03f sec.' % t.interval)

Выход:

get_rows_working() result : [('Data_0___Row_0', 'Data_1___Row_0'), ('Data_0___Row_1', 'Data_1___Row_1'), ('Data_0___Row_2', 'Data_1___Row_2'), ('Data_0___Row_3', 'Data_1___Row_3'), ('Data_0___Row_4', 'Data_1___Row_4'), ('Data_0___Row_5', 'Data_1___Row_5'), ('Data_0___Row_6', 'Data_1___Row_6'), ('Data_0___Row_7', 'Data_1___Row_7'), ('Data_0___Row_8', 'Data_1___Row_8'), ('Data_0___Row_9', 'Data_1___Row_9')]
get_rows_working() API call took 30.034 sec.

get_rows_not_working() result : [('Data_0___Row_0', 'Data_1___Row_0'), ('Data_0___Row_1', 'Data_1___Row_1'), ('Data_0___Row_2', 'Data_1___Row_2'), ('Data_0___Row_3', 'Data_1___Row_3'), ('Data_0___Row_4', 'Data_1___Row_4'), ('Data_0___Row_5', 'Data_1___Row_5'), ('Data_0___Row_6', 'Data_1___Row_6'), ('Data_0___Row_7', 'Data_1___Row_7'), ('Data_0___Row_8', 'Data_1___Row_8'), ('Data_0___Row_9', 'Data_1___Row_9')]
get_rows_not_working() API call took 3.008 sec.

Что означает, что второйфункция get_rows_not_working() фактически работает как положено и одновременно вызывает API.

Возможно ли, что вы получаете KeyError исключений, потому что API возвращает пустые данные, когда вы превышаете ограничение частоты запросов? Например, если API реализован как:

MAX_CONCUR_ROWS = 5
class API:
    connections = 0
    def __init__(self, r):
        self.r = r

    async def get_some_data(self, i):
        API.connections += 1
        await asyncio.sleep(3)
        if API.connections > MAX_CONCUR_ROWS * 2:
            res = {}
        else:
            res = {'key_{}'.format(i) : 'Data_{}___Row_{}'.format(i, self.r)}
        API.connections -= 1
        return res

, то get_rows_not_working() вернет KeyError: 'key_0', тогда как get_rows_working() работает нормально.

Если это так, то вы можете ограничить свои запросы, пакетируя их или используя asyncio.Semaphore:

async def get_single_row(api, semaphore):
    # using tasks instead of coroutines won't work because asyncio.ensure_future() starts the coroutine, so the semaphore won't have any effect.
    coros = []
    coros.append(api.get_some_data(0))
    coros.append(api.get_some_data(1))

    async with semaphore:
        resp = await asyncio.gather(*coros)
    data_0 = resp[0]
    data_1 = resp[1]

    extra_data_0 = data_0['key_0']
    extra_data_1 = data_1['key_1']
    return (extra_data_0, extra_data_1)

async def get_rows_not_working(rows):
    semaphore = asyncio.Semaphore(MAX_CONCUR_ROWS)
    tasks = []
    for r in rows:
        api = API(r)
        tasks.append(asyncio.ensure_future(get_single_row_coros(api, semaphore)))
    data = await asyncio.gather(*tasks)
    return data

Приведенный выше код не 't выполняет более 5 одновременных вызовов одновременно и возвращает ожидаемый результат (обратите внимание, что теперь вместо 3 требуется 6 секунд):

get_rows_not_working() result : [('Data_0___Row_0', 'Data_1___Row_0'), ('Data_0___Row_1', 'Data_1___Row_1'), ('Data_0___Row_2', 'Data_1___Row_2'), ('Data_0___Row_3', 'Data_1___Row_3'), ('Data_0___Row_4', 'Data_1___Row_4'), ('Data_0___Row_5', 'Data_1___Row_5'), ('Data_0___Row_6', 'Data_1___Row_6'), ('Data_0___Row_7', 'Data_1___Row_7'), ('Data_0___Row_8', 'Data_1___Row_8'), ('Data_0___Row_9', 'Data_1___Row_9')]
get_rows_not_working() API call took 6.013 sec. 
...