Как запустить функции параллельно, используя многопроцессорность, перебирая аргументы функций в python? - PullRequest
1 голос
/ 28 марта 2020

У меня есть произвольная функция, аргумент которой является файлом, скажем, func(file(i)). У меня есть разные каталоги, скажем, folder_id1, folder_id2, ..., folder_idn. Внутри каждого каталога есть некоторое число files(i), например, внутри folder_id2 есть файлы file(1), file(2), file(3) - три файла - каждый каталог имеет разное количество файлов.

Я хочу запустить функцию func(file(i)) для каждой file(i) внутри указанной папки c параллельно, используя multiprocessing. Так было бы что-то вроде этого:

def runInParallel(funcs):
  proc = []
  for func in funcs:
    p = Process(target=func)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()


def foo(folder_id(i)):
    runInParallel(func(file(1)), func(file(2)), ..., func(file(n)))

Однако я не знаю, как установить все функции func(file(i)) одновременно в качестве аргумента функции runInParallel(). Можно было бы перебрать файлы file(n) внутри папки folder_id(i), но тогда он не будет работать параллельно.

Есть идеи? Locals()?

Спасибо

Редактировать:

def interface(id, number):
    directory_name = f"{str(id)}_site_{str(number)}"


    relative_path = os.getcwd()

    file_path = os.path.join(relative_path, "output")
    output_directory = os.path.join(file_path, directory_name)
    os.chdir(output_directory)
    df = np.load('Principal.npy')
    print(df.shape)

    model_directory = os.path.join(relative_path, "modelos", directory_name)
    paths = [os.path.join(model_directory, filename) for filename in os.listdir(model_directory)]
    file_path = [path for path in paths if os.path.isfile(path)]


    f.runInparallel(paths, tms.desagregadorLSTM(, df=df))

1 Ответ

1 голос
/ 28 марта 2020

Один из способов go - получить полные пути всех файлов, представленных в данном folder_id(i). Затем вы можете передать эти file_paths и func в runInParallel, который затем применяет func к каждому файлу параллельно. , Вы также должны изменить функцию func, чтобы она могла принимать file_path в качестве аргумента. Вот код, который облегчает эту задачу,

import os
from multiprocessing import Process

def runInParallel(file_paths, func):
    proc = []
    for path in file_paths:
        p = Process(target=func, args=(path,))
        p.start()
        proc.append(p)

    for p in proc:
        p.join()


def foo(folder_id):
    paths = [os.path.join(folder_id, filename) for filename in os.listdir(folder_id)]
    file_paths = [path for path in paths if os.path.isfile(path)]

    runInParallel(file_paths, func)

ИЛИ Вместо Process вы можете использовать multiprocessing.Pool, чтобы облегчить эту задачу,

def runInParallel(file_paths, func):
    with Pool() as pool:
        results = pool.map(func, file_paths)

ОБНОВЛЕНИЕ (Согласно вашим комментариям):

, если modelo является файловым объектом, обновите свой метод desagregadorLSTM как,

def desagregadorLSTM(path, df, medicoes=96): 
    with open(path, "r") as modelo:
        model = load_model(modelo) 
        model.summary() 
        df = df 
        X, y = f.separar_interface(df, n_steps=40)
        X = X.reshape(X.shape[0], 2, 20, X.shape[2]) 
        y = y.reshape(y.shape[0], 40, 1) 
        test_predictions = model.predict(X).flatten() 
        y = y.flatten()

Вызов,

runInParallel(file_paths, tms.desagregadorLSTM)

Ало обновление,

p = Process(target=func, args=(path, df), kwargs={"medicoes": 96 })
...