Как использовать функции не верхнего уровня при распараллеливании? - PullRequest
0 голосов
/ 11 сентября 2018

Я хотел бы использовать многопроцессорную обработку в ресурсоемких вычислениях в коде, который я пишу, как показано в этом расширенном примере:

import numpy as np
import multiprocessing as multiproc

def function(r, phi, z, params):
    """returns an array of the timepoints and the corresponding values 
       (demanding computation in actual code, with iFFT and stuff)"""
    times = np.array([1.,2.,3.])
    tdependent_vals = r + z * times + phi
    return np.array([times, tdependent_vals])

def calculate_func(rmax, zmax, phi, param):
    rvals = np.linspace(0,rmax,5)
    zvals = np.linspace(0,zmax,5)
    for r in rvals:
        func_at_r = lambda z: function(r, phi, z, param)[1]
        with multiproc.Pool(2) as pool:
             fieldvals = np.array([*pool.map(func_at_r, zvals)])
             print(fieldvals) #for test, it's actually saved in a numpy array

calculate_func(3.,4.,5.,6.)

Если я запускаю это, он завершается неудачно с

AttributeError: Can't pickle local object 'calculate_func.<locals>.<lambda>'

Я думаю, что причина в том, что согласно документации , могут быть выбраны только функции, определенные на верхнем уровне, а моя функция, определенная lambda, не может.Но я не вижу способа, которым я мог бы сделать это автономной функцией, по крайней мере, не загрязняя модуль кучей переменных верхнего уровня: параметры неизвестны до вызова calculate_func, и они меняются на каждой итерациисвыше rvals.Вся эта многопроцессорность совершенно нова для меня, и я не мог придумать альтернативу.Какой был бы самый простой рабочий способ распараллеливания цикла по rvals и zvals?

Примечание: я использовал этот ответ в качестве отправной точки.

1 Ответ

0 голосов
/ 11 сентября 2018

Это, вероятно, не лучший ответ для этого, но это ответ, поэтому, пожалуйста, без ненависти:)

Вы можете просто написать функцию-обертку верхнего уровня, которую можно сериализовать, и заставить ее выполнять функции... Это немного похоже на начало функции, но я решил похожую проблему в своем коде следующим образом.

Вот краткий пример

def wrapper(arg_list, *args):
    func_str = arg_list[0]
    args = arg_list[1]
    code = marshal.loads(base64.b64decode(func_str.data))
    func = types.FunctionType(code, globals(), "wrapped_func")
    return func(*args)

def run_func(func, *args):
    func_str = base64.b64encode(marshal.dumps(func.__code__, 0))
    arg_list = [func_str, args]
    with mp.Pool(2) as pool:
        results = pool.map(wrapper, arg_list)
    return results
...