Я использую пакеты python 3.6.6 и requests
и bs4
для загрузки и анализа некоторого контента, сейчас я загружаю несколько больших файлов >1gb
и использую только одно соединение, это довольно медленно, поэтомуЯ хочу ускорить выполнение нескольких загрузок одновременно.
Важные фрагменты кода:
def download(dir, link, name):
r = requests.get(url, stream=True)
with open(f'{path}/{filename}', 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
files = [{'link':'http://...','filename':'somename.7z'}]
download_dir= '~/Downloads'
for file in files:
#do some things to check if file['link'] is valid and that the file dosen't already exist
download(download_dir, file['link'], file['filename'])
Что я хотел бы сделать, это запустить то, что находится в цикле вparralel, если быть точным, то, что в цикле выполняется 4 раза одновременно.
Моя первая попытка сделать что-то подобное использовала multiprocessing.Pool.map
, например:
def download(dir, link, name):
r = requests.get(url, stream=True)
with open(f'{path}/{filename}', 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
files = [{'link':'http://...','filename':'somename.7z'}]
download_dir= '~/Downloads'
datas = [{'file':f, 'dir':download_dir} for f in files]
worker(data)
file = data['file']
download_dir = data['dir']
#do some things to check if file['link'] is valid and that the file dosen't already exist
download(download_dir, file['link'], file['filename'])
pool = multiprocessing.Pool(4)
pool.map(worker, datas)
Это, к сожалению, неОн не работал и запустил более 4 загрузок одновременно, я предполагаю, что он использовал 4 потока, но каждый раз, когда поток достигал предела сети, и ни один из старых не был дальше, он просто запускал очередной рабочий.
В попытке заставить мою программу делать то, что я хочу, я попробовал этот хакерский способ вещей:
def download(dir, link, name):
r = requests.get(url, stream=True)
with open(f'{path}/{filename}', 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
files = [{'link':'http://...','filename':'somename.7z'}]
download_dir= '~/Downloads'
worker(file, download_dir)
#do some things to check if file['link'] is valid and that the file dosen't already exist
download(download_dir, file['link'], file['filename'])
index = 0
while index < len(files):
pool = multiprocessing.Pool(4)
for _ in range(4):
if index < len(files): #check exists cause I'm incrementing index in the inner for loop
pool.apply_async(worker, (files[index], download_dir,))
index += 1
pool.close()
pool.join()
Но pool.close()
не ждал завершения всех представленных задач, вместо этого он прервал downloads и, соответственно, также не позволяли возобновить выполнение задач, которые были переданы в пул после их отмены.
Каков будет правильный способ сделать это?