Как сделать так, чтобы процессы могли записывать в массив основной программы? - PullRequest
5 голосов
/ 16 марта 2012

Я делаю пул процессов, и каждый из них должен писать в разных частях матрицы, которая существует в основной программе.Не существует страха перезаписи информации, поскольку каждый процесс будет работать с разными строками матрицы.Как я могу сделать матрицу доступной для записи из процессов ??

Программа является множителем матрицы, назначенным мне профессором, и должна быть многопроцессорной.Это создаст процесс для каждого ядра компьютера.Основная программа отправит различные части матрицы процессам, и они будут вычислять их, а затем они будут возвращать их таким образом, чтобы я мог определить, какой ответ соответствует тому, на какой строке он был основан.

Ответы [ 4 ]

8 голосов
/ 17 марта 2012

Вы пытались использовать класс multiprocessing.Array для установки некоторой общей памяти?

См. Также пример из документов :

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]

Просто расширьте это до матрицы размером h*w с индексированием в стиле i*w+j.Затем добавьте несколько процессов, используя Пул процессов .

5 голосов
/ 24 марта 2012

Стоимость создания новых процессов или копирования матриц между ними, если процессы используются повторно, затмевает стоимость умножения матриц.В любом случае numpy.dot() может использовать разные ядра процессора.

Умножение матриц может быть распределено между процессами путем вычисления разных строк результата в разных процессах, например, при заданных входных матрицах a и b, а затемЭлемент result (i,j) равен:

out[i,j] = sum(a[i,:] * b[:,j])

Таким образом, i -ая строка может быть вычислена как:

import numpy as np

def dot_slice(a, b, out, i):
    t = np.empty_like(a[i,:])
    for j in xrange(b.shape[1]):
        # out[i,j] = sum(a[i,:] * b[:,j])
        np.multiply(a[i,:], b[:,j], t).sum(axis=1, out=out[i,j])

numpy массив принимает срез в качестве индекса, например,a[1:3,:] возвращает 2-ю и 3-ю строки.

a, b доступны только для чтения, поэтому они могут наследоваться как есть дочерними процессами ( с использованием функции копирования при записи в Linux ), результат вычисляется с использованием разделяемого массива.Во время вычислений копируются только индексы:

import ctypes
import multiprocessing as mp

def dot(a, b, nprocesses=mp.cpu_count()):
    """Perform matrix multiplication using multiple processes."""
    if (a.shape[1] != b.shape[0]):
        raise ValueError("wrong shape")

    # create shared array
    mp_arr = mp.RawArray(ctypes.c_double, a.shape[0]*b.shape[1])

    # start processes
    np_args = mp_arr, (a.shape[0], b.shape[1]), a.dtype
    pool = mp.Pool(nprocesses, initializer=init, initargs=(a, b)+np_args)

    # perform multiplication
    for i in pool.imap_unordered(mpdot_slice, slices(a.shape[0], nprocesses)):
        print("done %s" % (i,))
    pool.close()
    pool.join()

    # return result
    return tonumpyarray(*np_args)

Где:

def mpdot_slice(i):
    dot_slice(ga, gb, gout, i)
    return i

def init(a, b, *np_args):
    """Called on each child process initialization."""
    global ga, gb, gout
    ga, gb = a, b
    gout = tonumpyarray(*np_args)

def tonumpyarray(mp_arr, shape, dtype):
    """Convert shared multiprocessing array to numpy array.

    no data copying
    """
    return np.frombuffer(mp_arr, dtype=dtype).reshape(shape)

def slices(nitems, mslices):
    """Split nitems on mslices pieces.

    >>> list(slices(10, 3))
    [slice(0, 4, None), slice(4, 8, None), slice(8, 10, None)]
    >>> list(slices(1, 3))
    [slice(0, 1, None), slice(1, 1, None), slice(2, 1, None)]
    """
    step = nitems // mslices + 1
    for i in xrange(mslices):
        yield slice(i*step, min(nitems, (i+1)*step))

Чтобы проверить это:

def test():
    n = 100000
    a = np.random.rand(50, n)
    b = np.random.rand(n, 60)
    assert np.allclose(np.dot(a,b), dot(a,b, nprocesses=2))

В Linux эта многопроцессорная версия имеет ту же производительность, что и решение, использующее потоки и высвобождающее GIL (в расширении C) во время вычислений :

$ python -mtimeit -s'from test_cydot import a,b,out,np' 'np.dot(a,b,out)'
100 loops, best of 3: 9.05 msec per loop

$ python -mtimeit -s'from test_cydot import a,b,out,cydot' 'cydot.dot(a,b,out)' 
10 loops, best of 3: 88.8 msec per loop

$ python -mtimeit -s'from test_cydot import a,b; import mpdot' 'mpdot.dot(a,b)'
done slice(49, 50, None)
..[snip]..
done slice(35, 42, None)
10 loops, best of 3: 82.3 msec per loop

Примечание: тест был изменен для использования np.float64 везде.

2 голосов
/ 17 марта 2012

Матричное умножение означает, что каждый элемент полученной матрицы рассчитывается отдельно. Это похоже на работу для Pool . Поскольку это домашнее задание (а также выполнение кода SO), я только проиллюстрирую использование самого пула, а не всего решения.

Итак, вам нужно написать процедуру для вычисления (i, j) -го элемента результирующей матрицы:

def getProductElement(m1, m2, i, j):
    # some calculations
    return element

Затем вы инициализируете пул:

from multiprocessing import Pool, cpu_count
pool = Pool(processes=cpu_count())

Тогда вам нужно отправить работу. Вы также можете организовать их в матрице, но зачем это делать, давайте просто составим список.

result = []
# here you need to iterate through the the columns of the first and the rows of
# the second matrix. How you do it, depends on the implementation (how you store
# the matrices). Also, make sure you check the dimensions are the same.
# The simplest case is if you have a list of columns:

N = len(m1)
M = len(m2[0])
for i in range(N):
    for j in range(M):
        results.append(pool.apply_async(getProductElement, (m1, m2, i, j)))

Затем заполните полученную матрицу результатами:

m = []
count = 0
for i in range(N):
    column = []
    for j in range(M):
        column.append(results[count].get())
    m.append(column)

Опять же, точная форма кода зависит от того, как вы представляете матрицы.

0 голосов
/ 16 марта 2012

Вы не делаете.

Либо они возвращают свои правки в формате, который вы можете использовать в основной программе, либо вы используете какой-то тип межпроцессного взаимодействия, чтобы они отправляли свои правки, либо вы используетекакое-то общее хранилище, такое как база данных или сервер структур данных, например, redis.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...