Нужна помощь в распараллеливании этого кода - PullRequest
0 голосов
/ 21 мая 2018

Я натолкнулся (буквально) на распараллеливание следующего кода Python, и мне действительно может понадобиться некоторая помощь.

Прежде всего, ввод представляет собой файл CSV, состоящий из списка ссылок на веб-сайты, которые мне нужны.очищать с помощью функции scrape_function().Исходный код выглядит следующим образом и отлично работает

with open('C:\\links.csv','r') as source:
    reader=csv.reader(source)
    inputlist=list(reader)

m=[]

for i in inputlist:
    m.append(scrape_code(re.sub("\'|\[|\]",'',str(i)))) #remove the quotes around the link strings otherwise it results in URLError

print(m)

Затем я попытался распараллелить этот код, используя joblib следующим образом:

from joblib import Parallel, delayed
import multiprocessing

with open('C:\\links.csv','r') as source:
        reader=csv.reader(source)
        inputlist=list(reader)

cores = multiprocessing.cpu_count()
results = Parallel(n_jobs=cores)(delayed(m.append(scrape_code(re.sub("\'|\[|\]",'',str(i))))) for i in inputlist)

Однако это приведет к странной ошибке:

  File "C:\Users\...\joblib\pool.py", line 371, in send
    CustomizablePickler(buffer, self._reducers).dump(obj)
AttributeError: Can't pickle local object 'delayed.<locals>.delayed_function'

Есть идеи, что я здесь не так сделал?Если я попытаюсь поместить добавление в отдельную функцию, как показано ниже, ошибка исчезнет, ​​но выполнение будет зависать и зависать на неопределенное время:

def process(k):
    a=[]
    a.append(scrape_code(re.sub("\'|\[|\]",'',str(k))))
    return a

cores = multiprocessing.cpu_count()
results = Parallel(n_jobs=cores)(delayed(process)(i) for i in inputlist)

В списке ввода 10000 страниц, поэтому параллельная обработкабыть огромным преимуществом.

1 Ответ

0 голосов
/ 21 мая 2018

Если вам действительно это нужно в отдельных процессах, самый простой способ - просто создать пул процессов и позволить ему распределять ссылки на вашу функцию, например:

import csv
from multiprocessing import Pool

if __name__ == "__main__":  # multiprocessing guard
    with open("c:\\links.csv", "r", newline="") as f:  # open the CSV
        reader = csv.reader(f)  # create a reader
        links = [r[0] for r in reader]  # collect only the first column
    with Pool() as pool:  # create a pool, it will make a pool with all your CPU cores...
        results = pool.map(scrape_code, links)  # distribute your links to scrape_code
    print(results)

ПРИМЕЧАНИЕ. Я предполагаю, что ваша links.csv фактически содержит ссылку в своем первом столбце в зависимости от того, как вы предварительно обрабатываете ссылки в своем коде.

Однако, как я уже говорил в своем комментарии, это не обязательно должно быть быстрее, чем обычные потоки, поэтому я сначала попробую использовать потоки.К счастью, модуль multiprocessing включает в себя макет многопоточного интерфейса , поэтому вам просто нужно заменить from multiprocessing import Pool на from multiprocessing.dummy import Pool и посмотреть, в каком режиме ваш код работает быстрее.

...