Asyncio запросы с использованием многопоточности - PullRequest
0 голосов
/ 03 марта 2020

У меня большой список компаний, и я звоню в REST API, чтобы получить ежедневную цену акций для каждой компании. Детали хранятся в базе данных PostgreSQL. Основная функция выглядит следующим образом:

async def get_data_asynchronous():
conn = await asyncpg.connect(**DBConn)

path = 'path'
source = pd.read_excel(io=path + 'companies.xlsx', sheet_name='data')

retries = Retry(total=2, backoff_factor=1, status_forcelist=[404, 502, 503, 504])
dates = pd.date_range('2015-01-01', '2019-12-01', freq='D').strftime("%d-%m-%Y").tolist() 

with ThreadPoolExecutor(max_workers=10) as executor:
    with requests.Session() as session:
        session.mount('https://', HTTPAdapter(max_retries=retries))

        loop = asyncio.get_event_loop()

        for index, inputrow in source.iterrows():
            try:
                if int(inputrow['rowid']) > 0:
                    compid = inputrow['compid'].lower().strip()

                    tasks = [
                        loop.run_in_executor(
                            executor,
                            fetch,
                            *(session, compid, datetime.datetime.strptime(str(dates[i-1]), '%d-%m-%Y'), datetime.datetime.strptime(str(dates[i]), '%d-%m-%Y'))
                        )
                        for i in range(len(dates))
                    ]

                    for content in await asyncio.gather(*tasks):
                        if content is not None:
                            for data in content:
                                compid = data.get('compid', '')
                                date = data.get('date', '')
                                stock_price = data.get('sprice', '')

                                try:
                                    await conn.execute('''
                                                    INSERT INTO comp_dailyhistory VALUES($1, $2, $3)
                                                    ''', compid, date, stock_price)
                                except Exception as e:
                                    print('ERROR')
                                    pass
                    pass
            except Exception as e:
                print(str(e))
                pass

В приведенной выше функции я сначала получаю список компаний из листа Excel (источник) и создаю список дат. Поскольку в моем списке более 200 тыс. Компаний, я создаю ThreadPoolExecutor из 10 рабочих. Цель состоит в том, чтобы передать идентификатор каждой компании (compid) и две последовательные даты из диапазона дат в функцию «выборки» в асинхронном режиме, чтобы ускорить весь процесс сбора данных. Функция извлечения выглядит следующим образом:

def fetch(session, compid, start, stop):
base_url = 'baseurl'

try:
    with session.get(base_url + 'compid=' + compid + '&begin=' + str(int(start.timestamp())) + '&end=' + str(int(stop.timestamp())), timeout=None) as data:
        content = []

        if data.status_code == 200:
            for item in data.json():
                ret = {'compid': compid, 'date': str(date), 'sprice': sprice}
                content.append(ret)
            return content
        else:
            return None
except Exception as e:
    return None 

Функция извлечения использует request.get для получения списка цен на акции компании между датами начала и окончания, анализирует ответ JSON в список ключей. -значение пар и возвращает их вызывающей функции. Возвращенные списки затем выбираются функцией asyncio.gather в вызывающей функции, где каждая цена акции сохраняется в postgreSQL с использованием asyncpg. Остальная часть кода выглядит следующим образом:

def main():
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(get_data_asynchronous())
    loop.run_until_complete(future)

main()    

Основная проблема с этой настройкой заключается в том, что сценарий, похоже, не подбирает полный набор цен для данной компании. Например, для compid = 1 должно быть ровно 600 ежедневных цен. Тем не менее, каждый раз, когда запускается скрипт, я получаю другой результат, который всегда ниже истинного значения. Например, я получаю 550 дневных цен в первом запуске, 570 во втором запуске, 540 в третьем запуске и т. Д. ...

Почему мой сценарий не может получить полный список 600 ежедневных цен? Некоторые мои запросы так или иначе отбрасываются? Я попробовал альтернативу с запросами aiohttp, но не добился большого прогресса.

У меня нет опыта в многопоточном программировании, особенно в asyncio, и я действительно был бы признателен за любую помощь в этом отношении? Спасибо заранее за ваше время.

1 Ответ

1 голос
/ 03 марта 2020

Я выполнил несколько проектов, связанных с очисткой веб-сайтов, чтобы получать тысячи цен на акции каждый день. Проблема, как предположил Дано, связана с вашей обработкой ошибок:

except Exception as e:
    return None

Это ничего не делает для обработки неудачных запросов. Вы можете добавить неудачные URL-адреса в список, и в конце вашего скрипта снова запустить функцию «get» с этими URL-адресами. Если ваша информация важна, вы даже можете определить функцию, которая пытается по крайней мере 5-10 раз загрузить информацию об акции, прежде чем она вернет значение Нет.

Более подробно о многопоточности вы должны быть осторожны с количеством запросов в секунду / минуту / час и избегать превышения лимита скорости API / веб-сайта. Для этого вы можете использовать несколько прокси.

Надеюсь, это поможет.

...