Стоимость создания новых процессов или копирования матриц между ними, если процессы используются повторно, затмевает стоимость умножения матриц.В любом случае numpy.dot()
может использовать разные ядра процессора.
Умножение матриц может быть распределено между процессами путем вычисления разных строк результата в разных процессах, например, при заданных входных матрицах a
и b
, а затемЭлемент result (i,j)
равен:
out[i,j] = sum(a[i,:] * b[:,j])
Таким образом, i
-ая строка может быть вычислена как:
import numpy as np
def dot_slice(a, b, out, i):
t = np.empty_like(a[i,:])
for j in xrange(b.shape[1]):
# out[i,j] = sum(a[i,:] * b[:,j])
np.multiply(a[i,:], b[:,j], t).sum(axis=1, out=out[i,j])
numpy
массив принимает срез в качестве индекса, например,a[1:3,:]
возвращает 2-ю и 3-ю строки.
a
, b
доступны только для чтения, поэтому они могут наследоваться как есть дочерними процессами ( с использованием функции копирования при записи в Linux ), результат вычисляется с использованием разделяемого массива.Во время вычислений копируются только индексы:
import ctypes
import multiprocessing as mp
def dot(a, b, nprocesses=mp.cpu_count()):
"""Perform matrix multiplication using multiple processes."""
if (a.shape[1] != b.shape[0]):
raise ValueError("wrong shape")
# create shared array
mp_arr = mp.RawArray(ctypes.c_double, a.shape[0]*b.shape[1])
# start processes
np_args = mp_arr, (a.shape[0], b.shape[1]), a.dtype
pool = mp.Pool(nprocesses, initializer=init, initargs=(a, b)+np_args)
# perform multiplication
for i in pool.imap_unordered(mpdot_slice, slices(a.shape[0], nprocesses)):
print("done %s" % (i,))
pool.close()
pool.join()
# return result
return tonumpyarray(*np_args)
Где:
def mpdot_slice(i):
dot_slice(ga, gb, gout, i)
return i
def init(a, b, *np_args):
"""Called on each child process initialization."""
global ga, gb, gout
ga, gb = a, b
gout = tonumpyarray(*np_args)
def tonumpyarray(mp_arr, shape, dtype):
"""Convert shared multiprocessing array to numpy array.
no data copying
"""
return np.frombuffer(mp_arr, dtype=dtype).reshape(shape)
def slices(nitems, mslices):
"""Split nitems on mslices pieces.
>>> list(slices(10, 3))
[slice(0, 4, None), slice(4, 8, None), slice(8, 10, None)]
>>> list(slices(1, 3))
[slice(0, 1, None), slice(1, 1, None), slice(2, 1, None)]
"""
step = nitems // mslices + 1
for i in xrange(mslices):
yield slice(i*step, min(nitems, (i+1)*step))
Чтобы проверить это:
def test():
n = 100000
a = np.random.rand(50, n)
b = np.random.rand(n, 60)
assert np.allclose(np.dot(a,b), dot(a,b, nprocesses=2))
В Linux эта многопроцессорная версия имеет ту же производительность, что и решение, использующее потоки и высвобождающее GIL (в расширении C) во время вычислений :
$ python -mtimeit -s'from test_cydot import a,b,out,np' 'np.dot(a,b,out)'
100 loops, best of 3: 9.05 msec per loop
$ python -mtimeit -s'from test_cydot import a,b,out,cydot' 'cydot.dot(a,b,out)'
10 loops, best of 3: 88.8 msec per loop
$ python -mtimeit -s'from test_cydot import a,b; import mpdot' 'mpdot.dot(a,b)'
done slice(49, 50, None)
..[snip]..
done slice(35, 42, None)
10 loops, best of 3: 82.3 msec per loop
Примечание: тест был изменен для использования np.float64
везде.