У меня есть небольшая программа, которая загружает довольно тяжелый CSV (более 800 МБ, в блоках, используя pandas.read_csv
для ограничения использования памяти) и выполняет несколько API-вызовов к серверам «в дикой природе», и, наконец, создает результат объект, который затем сохраняется в базе данных.
Я добавил кеширование для сетевых запросов, где это возможно, но даже в этом случае выполнение кода занимает более 10 часов. Когда я профилирую код с помощью PySpy, большая часть его ожидает сетевых запросов.
Я попытался преобразовать его в asyncio для ускорения работы и сумел заставить код работать на небольшом подмножестве входной файл. Однако с полным файлом использование памяти становится непомерно большим.
Вот что я попробовал:
import pandas as pd
import httpx
async def process_item(item, client):
# send a few requests with httpx session
# process results
await save_results_to_db(res)
async def get_items_from_csv():
# loads the heavy CSV file
for chunk in pd.read_csv(filename, ...):
for row in chunk.itertuples():
item = item_from_row(row)
yield item
async def main():
async with httpx.AsyncClient() as client:
tasks = []
for item in get_items_from_csv():
tasks.append(process_item(item, client))
await asyncio.gather(*tasks)
asyncio.run(main())
Есть ли способ избежать создания списка tasks
, который становится очень тяжелый предмет с более чем 1,5 млн. предметов? Другим недостатком этого является то, что ни одна задача, кажется, не будет обработана, пока не будет прочитан весь файл, что не является идеальным. Я использую python 3.7, но при необходимости могу легко обновить до 3.8.