Загрузка большого файла по частям с использованием нескольких параллельных потоков - PullRequest
1 голос
/ 26 октября 2019

У меня есть сценарий использования, когда большой удаленный файл необходимо загружать по частям, используя несколько потоков. Каждый поток должен работать одновременно (параллельно), захватывая определенную часть файла. Ожидается, что части будут объединены в один (оригинальный) файл, как только все части будут успешно загружены.

Возможно, использование библиотеки запросов может помочь, но тогда я не уверен, каким образом я бы многопоточил эторешение, которое объединяет куски вместе.

url = 'https://url.com/file.iso'
headers = {"Range": "bytes=0-1000000"}  # first megabyte
r = get(url, headers=headers)

Я также думал об использовании curl, где Python будет координировать загрузки, но я не уверен, что это правильный путь. Это просто кажется слишком сложным и отклоняется от ванильного решения Python. Примерно так:

curl --range 200000000-399999999 -o file.iso.part2

Может кто-нибудь объяснить, как бы вы поступили примерно так? Или опубликовать пример кода чего-то, что работает в Python 3? Я обычно нахожу связанные с Python ответы довольно легко, но решение этой проблемы, похоже, ускользает от меня.

Ответы [ 2 ]

1 голос
/ 26 октября 2019

Вот версия, использующая Python 3 с Asyncio, это всего лишь пример, ее можно улучшить, но вы сможете получить все, что вам нужно.

  • get_size: отправить ГОЛОВУзапросить размер файла
  • download_range: загрузка одного фрагмента
  • download: загрузка всех фрагментов и объединение их
import asyncio
import concurrent.futures
import requests
import os


URL = 'https://file-examples.com/wp-content/uploads/2017/04/file_example_MP4_1920_18MG.mp4'
OUTPUT = 'video.mp4'


async def get_size(url):
    response = requests.head(url)
    size = int(response.headers['Content-Length'])
    return size


def download_range(url, start, end, output):
    headers = {'Range': f'bytes={start}-{end}'}
    response = requests.get(url, headers=headers)

    with open(output, 'wb') as f:
        for part in response.iter_content(1024):
            f.write(part)


async def download(executor, url, output, chunk_size=1000000):
    loop = asyncio.get_event_loop()

    file_size = await get_size(url)
    chunks = range(0, file_size, chunk_size)

    tasks = [
        loop.run_in_executor(
            executor,
            download_range,
            url,
            start,
            start + chunk_size - 1,
            f'{output}.part{i}',
        )
        for i, start in enumerate(chunks)
    ]

    await asyncio.wait(tasks)

    with open(output, 'wb') as o:
        for i in range(len(chunks)):
            chunk_path = f'{output}.part{i}'

            with open(chunk_path, 'rb') as s:
                o.write(s.read())

            os.remove(chunk_path)


if __name__ == '__main__':
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(
            download(executor, URL, OUTPUT)
        )
    finally:
        loop.close()
1 голос
/ 26 октября 2019

Вы можете использовать grequests для параллельной загрузки.

import grequests

URL = 'https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.1.0-amd64-netinst.iso'
CHUNK_SIZE = 104857600  # 100 MB
HEADERS = []

_start, _stop = 0, 0
for x in range(4):  # file size is > 300MB, so we download in 4 parts. 
    _start = _stop
    _stop = 104857600 * (x + 1)
    HEADERS.append({"Range": "bytes=%s-%s" % (_start, _stop)})


rs = (grequests.get(URL, headers=h) for h in HEADERS)
downloads = grequests.map(rs)

with open('/tmp/debian-10.1.0-amd64-netinst.iso', 'ab') as f:
    for download in downloads:
        print(download.status_code)
        f.write(download.content)

PS: Я не проверял, правильно ли определены диапазоны и совпадает ли загруженная сумма md5! В общем, это должно показать, как это может работать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...