Параллельная обработка с пулом в Python - PullRequest
1 голос
/ 21 мая 2019

Я попытался запустить параллельную обработку для локально определенной функции следующим образом:

import multiprocessing as mp                                                                                               
import numpy as np
import pdb


def testFunction():                                                                                                        
  x = np.asarray( range(1,10) )
  y = np.asarray( range(1,10) )

  def myFunc( i ):
    return np.sum(x[0:i]) * y[i]

  p = mp.Pool( mp.cpu_count() )
  out = p.map( myFunc, range(0,x.size) )
  print( out )


if __name__ == '__main__':
  print( 'I got here' )                                                                                                         
  testFunction()

При этом я получаю следующую ошибку:

cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Как я могу использовать многопроцессорную обработку для параллельной обработки нескольких массивов, как я пытаюсь сделать здесь? x и y обязательно определены внутри функции; Я бы предпочел не делать их глобальными переменными.

Вся помощь приветствуется.

1 Ответ

2 голосов
/ 22 мая 2019

Просто сделайте функцию обработки глобальной и передайте пары значений массива вместо ссылки на них по индексу в функции:

import multiprocessing as mp

import numpy as np


def process(inputs):
    x, y = inputs

    return x * y


def main():
    x = np.asarray(range(10))
    y = np.asarray(range(10))

    with mp.Pool(mp.cpu_count()) as pool:
        out = pool.map(process, zip(x, y))

    print(out)


if __name__ == '__main__':
    main()

Выход:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

ОБНОВЛЕНИЕ : В соответствии с предоставленными новыми данными, вы должны совместно использовать массивы между различными процессами. Это именно то, для чего используется multiprocessing.Manager.

Объект manager, возвращаемый Manager (), контролирует процесс сервера, который содержит объекты Python и позволяет другим процессам манипулировать ими используя прокси.

Таким образом, полученный код будет выглядеть примерно так:

from functools import partial
import multiprocessing as mp

import numpy as np


def process(i, x, y):
    return np.sum(x[:i]) * y[i]


def main():
    manager = mp.Manager()

    x = manager.Array('i', range(10))
    y = manager.Array('i', range(10))

    func = partial(process, x=x, y=y)

    with mp.Pool(mp.cpu_count()) as pool:
        out = pool.map(func, range(len(x)))

    print(out)


if __name__ == '__main__':
    main()

Выход:

[0, 0, 2, 9, 24, 50, 90, 147, 224, 324]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...