Как использовать Asyncio с очень длинным списком задач (генератор) - PullRequest
1 голос
/ 21 апреля 2020

У меня есть небольшая программа, которая загружает довольно тяжелый CSV (более 800 МБ, в блоках, используя pandas.read_csv для ограничения использования памяти) и выполняет несколько API-вызовов к серверам «в дикой природе», и, наконец, создает результат объект, который затем сохраняется в базе данных.

Я добавил кеширование для сетевых запросов, где это возможно, но даже в этом случае выполнение кода занимает более 10 часов. Когда я профилирую код с помощью PySpy, большая часть его ожидает сетевых запросов.

Я попытался преобразовать его в asyncio для ускорения работы и сумел заставить код работать на небольшом подмножестве входной файл. Однако с полным файлом использование памяти становится непомерно большим.

Вот что я попробовал:

import pandas as pd
import httpx

async def process_item(item, client):
    # send a few requests with httpx session
    # process results
    await save_results_to_db(res)

async def get_items_from_csv():
    # loads the heavy CSV file
    for chunk in pd.read_csv(filename, ...):
        for row in chunk.itertuples():
            item = item_from_row(row)
            yield item

async def main():
    async with httpx.AsyncClient() as client:
        tasks = []
        for item in get_items_from_csv():
            tasks.append(process_item(item, client))
        await asyncio.gather(*tasks)

asyncio.run(main())

Есть ли способ избежать создания списка tasks, который становится очень тяжелый предмет с более чем 1,5 млн. предметов? Другим недостатком этого является то, что ни одна задача, кажется, не будет обработана, пока не будет прочитан весь файл, что не является идеальным. Я использую python 3.7, но при необходимости могу легко обновить до 3.8.

1 Ответ

1 голос
/ 24 апреля 2020

Я думаю, что вы ищете здесь не для запуска в пакетном режиме, а для запуска N рабочих, которые одновременно вытягивают задачи из очереди.

N = 10  # scale based on the processing power and memory you have

async def main():
    async with httpx.AsyncClient() as client:
        tasks = asyncio.Queue()
        for item in get_items_from_csv():
            tasks.put_nowait(process_item(item, client))

        async def worker():
            while not tasks.empty():
                await tasks.get_nowait()
            # for a server
            # while task := await tasks.get():
            #     await task

        await asyncio.gather(*[worker() for _ in range(N)])

Я использовал asyncio.Queue, но вы также можете просто используйте collections.deque, так как все задачи добавляются в очередь до запуска работника. Первый особенно полезен при запуске рабочих, которые работают в длительном процессе (например, на сервере), где элементы могут помещаться в асинхронную очередь.

...