Объединение itertools и многопроцессорности? - PullRequest
12 голосов
/ 05 сентября 2011

У меня есть 256x256x256 массив Numpy, в котором каждый элемент является матрицей. Мне нужно сделать некоторые вычисления для каждой из этих матриц, и я хочу использовать модуль multiprocessing, чтобы ускорить процесс.

Результаты этих вычислений должны быть сохранены в массиве 256x256x256, как и исходный, так что результат матрицы в элементе [i,j,k] в исходном массиве должен быть помещен в элемент [i,j,k] нового массив.

Для этого я хочу создать список, который можно записать псевдоизображением как [array[i,j,k], (i, j, k)], и передать его функции, которая будет «мультипроцессорной». Предполагая, что matrices - это список всех матриц, извлеченных из исходного массива, а myfunc - это функция, выполняющая вычисления, код будет выглядеть примерно так:

import multiprocessing
import numpy as np
from itertools import izip

def myfunc(finput):
    # Do some calculations...
    ...

    # ... and return the result and the index:
    return (result, finput[1])

# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)

# Make function input from the matrices and the indices:
finput = izip(matrices, inds)

pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))

Однако, похоже, map_async на самом деле сначала создает этот огромный finput -лист: мои ЦП не слишком много работают, но память и подкачка полностью расходуются в считанные секунды, что, очевидно, не то, что Я хочу.

Есть ли способ передать этот огромный список многопроцессорной функции без необходимости его явного предварительного создания? Или вы знаете другой способ решения этой проблемы?

Спасибо большое! : -)

Ответы [ 3 ]

11 голосов
/ 05 сентября 2011

Все multiprocessing.Pool.map* методы потребляют итераторы полностью (демонстрационный код) , как только вызывается функция. Для подачи фрагментов функции карты итератора по одному фрагменту за раз, используйте grouper_nofill:

def grouper_nofill(n, iterable):
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
    '''
    it=iter(iterable)
    def take():
        while 1: yield list(itertools.islice(it,n))
    return iter(take().next,[])

chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
    async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)

PS. Параметр pool.map_async chunksize делает что-то другое: он разбивает итерируемое на куски, а затем передает каждый кусок рабочему процессу, который вызывает map(func,chunk). Это может дать рабочему процессу больше данных для разборки, если func(item) завершается слишком быстро, но это не помогает в вашей ситуации, поскольку итератор по-прежнему полностью используется сразу после вызова map_async.

2 голосов
/ 14 января 2014

Я тоже столкнулся с этой проблемой.вместо этого:

res = p.map(func, combinations(arr, select_n))

do

res = p.imap(func, combinations(arr, select_n))

imap не использует его!

0 голосов
/ 05 сентября 2011

Pool.map_async() необходимо знать длину итерируемого для отправки работы нескольким работникам.* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}}}

* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 1005с __len__.
...