Запись веб-ответов в файл в асинхронной программе - PullRequest
5 голосов
/ 08 марта 2019

Работает над заменой моей реализации инструмента запросов к серверу, который использует ThreadPoolExecutors на все асинхронные вызовы с использованием asyncio и aiohttp.Большая часть перехода проста, так как сетевые вызовы не блокируют ввод-вывод, это - загадка ответов, что ставит меня в тупик.

Все примеры, которые я использую, даже документы для обеих библиотек, используютasyncio.gather(), которая собирает все ожидаемые результаты.В моем случае это могут быть файлы размером в несколько ГБ, и я не хочу хранить их в памяти.

Какой подходящий способ решить эту проблему?Использовать ли asyncio.as_completed(), а затем:

for f in as_completed(aws):
    earliest_result = await f
    # Assumes `loop` defined under `if __name__` block outside coroutine
    loop = get_event_loop()
    # Run the blocking IO in an exectuor and write to file
    _ = await loop.run_in_executor(None, save_result, earliest_result)

Разве это не приводит к созданию потока (если я по умолчанию использую ThreadPoolExecutor), что делает его асинхронным, мультимногопоточная программа или асинхронная однопоточная программа?

Далее, гарантирует ли это, что в файл в любое время записывается только 1 earliest_result?Я не хочу, чтобы выполнялся вызов await loop.run_in_executor(...), затем приходит другой результат, и я пытаюсь запустить тот же файл;Полагаю, я мог бы ограничиться семафорами.

Ответы [ 2 ]

1 голос
/ 23 мая 2019

Я бы предложил использовать aiohttp Streaming API . Записывайте свои ответы прямо на диск вместо оперативной памяти и возвращайте имена файлов вместо самих ответов из набора. При этом вообще не будет использоваться много памяти. Это небольшая демонстрация того, что я имею в виду:

import asyncio

import aiofiles
from aiohttp import ClientSession


async def make_request(session, url):
    response = await session.request(method="GET", url=url)
    filename = url.split('/')[-1]
    async for data in response.content.iter_chunked(1024):
        async with aiofiles.open(filename, "ba") as f:
            await f.write(data)
    return filename


async def main():
    urls = ['https://github.com/Tinche/aiofiles',
            'https://github.com/aio-libs/aiohttp']
    async with ClientSession() as session:
        coros = [make_request(session, url) for url in urls]
        result_files = await asyncio.gather(*coros)
    print(result_files)


asyncio.run(main())
0 голосов
/ 08 марта 2019

В моем случае это могут быть файлы размером в несколько ГБ, и я не хочу хранить их в памяти.

Если я прав и в вашем коде одиночный aws означает загрузку одного файла, вы можете столкнуться со следующей проблемой: в то время как as_completed позволяет обмениваться данными из оперативной памяти на жесткий диск как можно скореевсе ваши aws работают параллельно, сохраняя все свои данные (буфер с частично загруженным файлом) в ОЗУ одновременно.

Чтобы избежать этого, вам нужно использовать семафор, чтобы гарантировать, что не много файлов загружаются параллельно впервое место, таким образом, чтобы предотвратить чрезмерное использование ОЗУ.

Вот пример использования семафор .

Разве это не вводит поток (если по умолчанию я использую ThreadPoolExecutor) что делает эту асинхронную многопотоковую программу более чем асинхронной однопоточной программой?

Я не уверен, я понимаю ваш вопрос, но да, ваш код будет использовать потоки, нотолько save_result будет выполняться внутри этих потоков.Весь другой код все еще выполняется в одном основном потоке.Ничего плохого в этом нет.

Далее, гарантирует ли это, что в файл в любое время записывается только 1 ранее самый ранний результат?

Да, это [*].Чтобы быть точно ключевым словом await в последней строке вашего фрагмента, это будет гарантировано:

_ = await loop.run_in_executor(None, save_result, earliest_result)

Вы можете прочитать это как: «Начните выполнение run_in_executor асинхронно и приостановите поток выполнения в этой строке до run_in_executorсделано и возвращен результат ".


[*] Да, если вы не запускаете несколько циклов для f in as_completed(aws) параллельно во-первых.

...