Копируются ли общие данные только для чтения в разные процессы для многопроцессорной обработки? - PullRequest
48 голосов
/ 05 апреля 2011

Кусок кода, который у меня есть, выглядит примерно так:

glbl_array = # a 3 Gb array

def my_func( args, def_param = glbl_array):
    #do stuff on args and def_param

if __name__ == '__main__':
  pool = Pool(processes=4)
  pool.map(my_func, range(1000))

Есть ли способ убедиться (или поощрить), что разные процессы не получают копию glbl_array, а делится ею,Если нет способа остановить копирование, я воспользуюсь массивом memmapped, но мои шаблоны доступа не очень регулярны, поэтому я ожидаю, что массивы memmapped будут работать медленнее.Выше было похоже на первое, что нужно попробовать.Это на Linux.Я просто хотел получить совет от Stackoverflow и не хочу раздражать сисадмина.Как вы думаете, это поможет, если второй параметр является подлинным неизменным объектом, таким как glbl_array.tostring().

Ответы [ 4 ]

103 голосов
/ 05 апреля 2011

Вы можете легко использовать совместно используемую память из multiprocessing вместе с Numpy:

import multiprocessing
import ctypes
import numpy as np

shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(10, 10)

#-- edited 2015-05-01: the assert check below checks the wrong thing
#   with recent versions of Numpy/multiprocessing. That no copy is made
#   is indicated by the fact that the program prints the output shown below.
## No copy was made
##assert shared_array.base.base is shared_array_base.get_obj()

# Parallel processing
def my_func(i, def_param=shared_array):
    shared_array[i,:] = i

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    pool.map(my_func, range(10))

    print shared_array

, которая печатает <pre>[[ 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.] [ 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [ 2. 2. 2. 2. 2. 2. 2. 2. 2. 2.] [ 3. 3. 3. 3. 3. 3. 3. 3. 3. 3.] [ 4. 4. 4. 4. 4. 4. 4. 4. 4. 4.] [ 5. 5. 5. 5. 5. 5. 5. 5. 5. 5.] [ 6. 6. 6. 6. 6. 6. 6. 6. 6. 6.] [ 7. 7. 7. 7. 7. 7. 7. 7. 7. 7.] [ 8. 8. 8. 8. 8. 8. 8. 8. 8. 8.] [ 9. 9. 9. 9. 9. 9. 9. 9. 9. 9.]]

Однако в Linux есть копия-записать семантику на fork(), поэтому даже без использования multiprocessing.Array данные не будут скопированы, пока они не будут записаны.

5 голосов
/ 05 января 2016

Следующий код работает на Win7 и Mac (возможно на linux, но не тестировался).

import multiprocessing
import ctypes
import numpy as np

#-- edited 2015-05-01: the assert check below checks the wrong thing
#   with recent versions of Numpy/multiprocessing. That no copy is made
#   is indicated by the fact that the program prints the output shown below.
## No copy was made
##assert shared_array.base.base is shared_array_base.get_obj()

shared_array = None

def init(shared_array_base):
    global shared_array
    shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
    shared_array = shared_array.reshape(10, 10)

# Parallel processing
def my_func(i):
    shared_array[i, :] = i

if __name__ == '__main__':
    shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10)

    pool = multiprocessing.Pool(processes=4, initializer=init, initargs=(shared_array_base,))
    pool.map(my_func, range(10))

    shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
    shared_array = shared_array.reshape(10, 10)
    print shared_array
2 голосов
/ 13 февраля 2015

Для тех, кто застрял в Windows, которая не поддерживает fork() (если не используется CygWin), pv-ответ не работает. Глобальные переменные недоступны для дочерних процессов.

Вместо этого вы должны передать совместно используемую память во время инициализации Pool / Process следующим образом:

#! /usr/bin/python

import time

from multiprocessing import Process, Queue, Array

def f(q,a):
    m = q.get()
    print m
    print a[0], a[1], a[2]
    m = q.get()
    print m
    print a[0], a[1], a[2]

if __name__ == '__main__':
    a = Array('B', (1, 2, 3), lock=False)
    q = Queue()
    p = Process(target=f, args=(q,a))
    p.start()
    q.put([1, 2, 3])
    time.sleep(1)
    a[0:3] = (4, 5, 6)
    q.put([4, 5, 6])
    p.join()

(это не тупой код и не хороший код, но он иллюстрирует суть; -)

0 голосов
/ 22 января 2017

Если вы ищете вариант, который эффективно работает в Windows и хорошо работает для нерегулярных шаблонов доступа, ветвления и других сценариев, где вам может потребоваться анализ различных матриц на основе комбинации матрицы совместно используемой памяти и процесса. локальные данные, набор инструментов mathDict в пакете ParallelRegression был разработан для решения именно этой ситуации.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...