параллельная рекурсивная функция в python? - PullRequest
6 голосов
/ 28 августа 2011

Как распараллелить рекурсивную функцию в Python?

Моя функция выглядит следующим образом:

def f(x, depth):
    if x==0:
        return ...
    else :
        return [x] + map(lambda x:f(x, depth-1), list_of_values(x))

def list_of_values(x):
    # heavy compute, pure function

При попытке распараллелить его с multiprocessing.Pool.map окна открывают бесконечное число процессов и зависают.

Что хорошо (желательно простой) способ распараллелить его (для одной многоядерной машины)?

Вот код, который зависает:

from multiprocessing import Pool
pool = pool(processes=4) 
def f(x, depth):
    if x==0:
        return ...
    else :
        return [x] + pool.map(lambda x:f(x, depth-1), list_of_values(x))

def list_of_values(x):
    # heavy compute, pure function

Ответы [ 2 ]

5 голосов
/ 28 августа 2011

хорошо, извините за проблемы с этим.

Я собираюсь ответить на немного другой вопрос, где f() возвращает сумму значений в списке. это потому, что из вашего примера мне не ясно, каким будет тип возвращаемого значения f(), а использование целого числа делает код простым для понимания.

это сложно, потому что параллельно происходят две разные вещи:

  1. расчет дорогой функции в пуле
  2. рекурсивное расширение f()

Я очень осторожен, чтобы использовать только пул для вычисления дорогой функции. таким образом, мы не получаем «взрыв» процессов. но поскольку это асинхронно, нам нужно отложить много работы для обратного вызова, который рабочий вызывает после выполнения дорогостоящей функции.

более того, нам нужно использовать защелку обратного отсчета, чтобы мы знали, когда все отдельные суб-вызовы к f() завершены.

может быть более простой способ (я почти уверен, что есть, но мне нужно делать другие вещи), но, возможно, это дает вам представление о том, что возможно:

from multiprocessing import Pool, Value, RawArray, RLock
from time import sleep

class Latch:

    '''A countdown latch that lets us wait for a job of "n" parts'''

    def __init__(self, n):
        self.__counter = Value('i', n)
        self.__lock = RLock()

    def decrement(self):
        with self.__lock:
            self.__counter.value -= 1
            print('dec', self.read())
        return self.read() == 0

    def read(self):
        with self.__lock:
            return self.__counter.value

    def join(self):
        while self.read():
            sleep(1)


def list_of_values(x):
    '''An expensive function'''
    print(x, ': thinking...')
    sleep(1)
    print(x, ': thought')
    return list(range(x))


pool = Pool()


def async_f(x, on_complete=None):
    '''Return the sum of the values in the expensive list'''
    if x == 0:
        on_complete(0) # no list, return 0
    else:
        n = x # need to know size of result beforehand
        latch = Latch(n) # wait for n entires to be calculated
        result = RawArray('i', n+1) # where we will assemble the map
        def delayed_map(values):
            '''This is the callback for the pool async process - it runs
               in a separate thread within this process once the
               expensive list has been calculated and orchestrates the
               mapping of f over the result.'''
            result[0] = x # first value in list is x
            for (v, i) in enumerate(values):
                def callback(fx, i=i):
                    '''This is the callback passed to f() and is called when 
                       the function completes.  If it is the last of all the
                       calls in the map then it calls on_complete() (ie another
                       instance of this function) for the calling f().'''
                    result[i+1] = fx
                    if latch.decrement(): # have completed list
                        # at this point result contains [x]+map(f, ...)
                        on_complete(sum(result)) # so return sum
                async_f(v, callback)
        # Ask worker to generate list then call delayed_map
        pool.apply_async(list_of_values, [x], callback=delayed_map)


def run():
    '''Tie into the same mechanism as above, for the final value.'''
    result = Value('i')
    latch = Latch(1)
    def final_callback(value):
        result.value = value
        latch.decrement()
    async_f(6, final_callback)
    latch.join() # wait for everything to complete
    return result.value


print(run())

ps я использую python3.2, и уродство выше, потому что мы откладываем вычисление окончательных результатов (возвращаясь к дереву) на потом. возможно, что-то вроде генераторов или фьючерсов может упростить вещи.

также я подозреваю, что вам нужен кеш, чтобы избежать ненужного пересчета дорогой функции при вызове с тем же аргументом, что и ранее.

см. Также ответ Янива - параллельная рекурсивная функция в python? - который, как представляется, является альтернативным способом изменить порядок вычисления путем явного указания глубины.

2 голосов
/ 29 августа 2011

Подумав об этом, я нашел простой, не полный, но достаточно хороший ответ:

# a partially parallel solution , just do the first level of recursion in paralell. it might be enough work to fill all cores.
import multiprocessing 

def f_helper(data):
     return f(x=data['x'],depth=data['depth'], recursion_depth=data['recursion_depth'])

def f(x, depth, recursion_depth):
    if depth==0:
        return ...
    else :
        if recursion_depth == 0:
            pool = multiprocessing.Pool(processes=4)
            result = [x] + pool.map(f_helper, [{'x':_x, 'depth':depth-1,  'recursion_depth':recursion_depth+1 } _x in list_of_values(x)])
            pool.close()
        else:
            result = [x] + map(f_helper, [{'x':_x, 'depth':depth-1, 'recursion_depth':recursion_depth+1 } _x in list_of_values(x)])


        return result 

def list_of_values(x):
    # heavy compute, pure function
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...