Распараллеливание / планирование python вызова функции для многих файлов - PullRequest
0 голосов
/ 05 августа 2020

У меня есть несколько сотен тысяч файлов csv, к которым я хотел бы применить ту же функцию. Что-то вроде следующей фиктивной функции:

def process_single_file(fname):
    df = pd.read_csv(fname)
    # Pandas and non-pandas processing
    df.to_csv(f"./output/{fname}")

Поскольку цикл по всем файлам по отдельности занял бы слишком много времени, мой вопрос заключается в том, какой самый эффективный способ запланировать и распараллелить это выполнение - никакие процессы не зависят друг от друга. Я начал с попытки использовать python s multiprocessing:

import multiprocessing

files = sorted(glob.glob("./input/*.csv"))

processes = []
for fname in files:
    p = multiprocessing.Process(target=process_file, args=(fname,))
    processes.append(p)
    p.start()

for process in processes:
    process.join()

Мой компьютер, однако, похоже, не любит этот процесс, поскольку он быстро перегружает все ЦП и приводит к замедлению работы и вылетает. Есть ли более эффективный способ снизить нагрузку на все ЦП и запланировать задачи, такие как использование Dask, какого-то сценария Bash или изменение python? Заранее спасибо.

Ответы [ 3 ]

2 голосов
/ 05 августа 2020

Это действительно зависит от того, где находится ваше узкое место: тратите ли вы больше времени на чтение / запись файлов или на обработку ЦП? 1006 * действительно помог мне узнать обо всем этом, я могу только порекомендовать хорошее прочтение;)

Как объясняется в учебнике, если ввод-вывод, многопоточности будет достаточно (и, возможно, лучше, чем многопроцессорность):

def process_all_files(files):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(process_single_file, files)

А если CPU, многопроцессорность позволит вам использовать все доступные ядра:

def process_all_files(files):
    with multiprocessing.Pool() as pool:
        pool.map(process_single_file, files)
1 голос
/ 05 августа 2020

Можете попробовать Ray , это достаточно эффективный модуль для распараллеливания задач

0 голосов
/ 05 августа 2020

Абсолютно пул - это путь к go. Что-то вроде следующих строк

`из multiprocessing import Pool

def f (x): return x * x

if name == ' main ': pool = Pool (cesses = 4) `

проверьте следующий пост

Использование multiprocessing.Process с максимальным количеством одновременных процессов

...