Как я могу убедиться, что только 4 экземпляра функции работают (используя многопроцессорность)? - PullRequest
0 голосов
/ 07 октября 2018

Я использую пакеты python 3.6.6 и requests и bs4 для загрузки и анализа некоторого контента, сейчас я загружаю несколько больших файлов >1gb и использую только одно соединение, это довольно медленно, поэтомуЯ хочу ускорить выполнение нескольких загрузок одновременно.

Важные фрагменты кода:

def download(dir, link, name):
    r = requests.get(url, stream=True)
    with open(f'{path}/{filename}', 'wb') as f:
        for chunk in r.iter_content(chunk_size=1024):
            if chunk:
                f.write(chunk)

files = [{'link':'http://...','filename':'somename.7z'}]
download_dir= '~/Downloads'

for file in files:
    #do some things to check if file['link'] is valid and that the file dosen't already exist
    download(download_dir, file['link'], file['filename'])

Что я хотел бы сделать, это запустить то, что находится в цикле вparralel, если быть точным, то, что в цикле выполняется 4 раза одновременно.

Моя первая попытка сделать что-то подобное использовала multiprocessing.Pool.map, например:

def download(dir, link, name):
    r = requests.get(url, stream=True)
    with open(f'{path}/{filename}', 'wb') as f:
        for chunk in r.iter_content(chunk_size=1024):
            if chunk:
                f.write(chunk)

files = [{'link':'http://...','filename':'somename.7z'}]
download_dir= '~/Downloads'

datas = [{'file':f, 'dir':download_dir} for f in files]

worker(data)
    file = data['file']
    download_dir = data['dir']
    #do some things to check if file['link'] is valid and that the file dosen't already exist
    download(download_dir, file['link'], file['filename'])

pool = multiprocessing.Pool(4)
pool.map(worker, datas)

Это, к сожалению, неОн не работал и запустил более 4 загрузок одновременно, я предполагаю, что он использовал 4 потока, но каждый раз, когда поток достигал предела сети, и ни один из старых не был дальше, он просто запускал очередной рабочий.

В попытке заставить мою программу делать то, что я хочу, я попробовал этот хакерский способ вещей:

def download(dir, link, name):
    r = requests.get(url, stream=True)
    with open(f'{path}/{filename}', 'wb') as f:
        for chunk in r.iter_content(chunk_size=1024):
            if chunk:
                f.write(chunk)

files = [{'link':'http://...','filename':'somename.7z'}]
download_dir= '~/Downloads'

worker(file, download_dir)
    #do some things to check if file['link'] is valid and that the file dosen't already exist
    download(download_dir, file['link'], file['filename'])

index = 0
while index < len(files):
    pool = multiprocessing.Pool(4)

    for _ in range(4):
        if index < len(files): #check exists cause I'm incrementing index in the inner for loop
            pool.apply_async(worker, (files[index], download_dir,))
        index += 1

    pool.close()
    pool.join()

Но pool.close() не ждал завершения всех представленных задач, вместо этого он прервал downloads и, соответственно, также не позволяли возобновить выполнение задач, которые были переданы в пул после их отмены.

Каков будет правильный способ сделать это?

1 Ответ

0 голосов
/ 07 октября 2018

Ваш код содержит ряд логических и стилевых ошибок.Самым большим является то, что вы создаете новый пул для каждого элемента вашего входного списка!Должен быть только один пул.

if (index < len(urllist)) всегда верно, потому что вы внутри while index < len(urlist).

pool = multiprocessing.Pool(processes=size) - плохой стиль;вы должны использовать with (он же «контекстный менеджер») везде, где это возможно.Это избавит от необходимости звонков pool.close() и pool.join().Например:

with multiprocessing.Pool(processes=size) as pool:
    pool.apply_async(...)

Числа для циклов считаются плохим стилем в Python, когда их легко избежать.Вам не понадобится шаблон index = index + 1, если вы просто используете for url in urllist: или аналогичный.

Наконец, вам вообще не нужно выполнять цикл, потому что pool.map()pool.map_async()) существует и можетсделать все это одним вызовом функции.

...