Можно ли реализовать эту версию умножения матриц, используя Numpy? - PullRequest
3 голосов
/ 11 февраля 2020

Я хочу быстро оценить приведенную ниже функцию, которая на высоком уровне напоминает матричное умножение. Для больших матриц приведенная ниже реализация на несколько порядков медленнее, чем numpy умножение матриц, что заставляет меня поверить, что есть лучший способ реализовать это с использованием numpy. Есть ли способ реализовать это, используя функции numpy вместо циклов for? Матрицы, с которыми я работаю, имеют диапазон 10K-100K элементов в каждом измерении, поэтому такая оптимизация крайне необходима.

Один из подходов заключается в использовании массива 3D numpy, но он оказывается слишком большим для хранения , Я также посмотрел в np.vectorize, который не кажется подходящим.

Большое спасибо за ваше руководство.

Редактировать: Спасибо всем за ваши фантазии c понимание и ответы. Очень ценю вклад. Перемещение журнала за пределы l oop значительно улучшает время выполнения, и интересно видеть, что поиск k важен. У меня есть продолжение, если я могу: вы видите способ ускорить, если внутреннее выражение l oop становится C[i,j] += A[i,k] * np.log(A[i,k] + B[k,j])? Бревно может быть перемещено наружу, как и раньше, но только в том случае, если возведено в степень A[i,k], что является дорогостоящим и исключает выгоды от перемещения из журнала.

import numpy as np
import numba
from numba import njit, prange

@numba.jit(fastmath=True, parallel=True)
def f(A, B):
    
    C = np.zeros((A.shape[0], B.shape[1]))

    for i in prange(A.shape[0]):
        for j in prange(B.shape[1]):
            for k in prange(A.shape[1]):
                
                C[i,j] += np.log(A[i,k] + B[k,j])
                #matrix mult. would be: C[i,j] += A[i,k] * B[k,j]

    return C

#A = np.random.rand(100000, 100000)
#B = np.random.rand(100000, 100000)
#f(A, B)

Ответы [ 2 ]

2 голосов
/ 11 февраля 2020

Как уже отмечал @jdehesa, вы можете использовать следующее упрощение: log(a) + log(b) == log(a * b) Но учтите, что результаты могут немного отличаться. Кроме того, есть много способов оптимизировать эту функцию. Наилучшее решение зависит от размера входных матриц и требуемой числовой стабильности.

Использование скаляра и работа с транспонированным массивом (возможно автоматическое c SIMD-векторизация)

import numpy as np

#from version 0.43 until 0.47 this has to be set before importing numba
#Bug: https://github.com/numba/numba/issues/4689
from llvmlite import binding
binding.set_option('SVML', '-vector-library=SVML')
import numba as nb

@nb.njit(fastmath=True,parallel=True)
def f_orig(A, B):
    C = np.zeros((A.shape[0], B.shape[1]))

    for i in nb.prange(A.shape[0]):
        for j in range(B.shape[1]):
            for k in range(A.shape[1]):
                C[i,j] += np.log(A[i,k] + B[k,j])
                #matrix mult. would be: C[i,j] += A[i,k] * B[k,j]

    return C

@nb.njit(fastmath=True,parallel=True)
def f_pre_opt(A, B):
    C = np.empty((A.shape[0], B.shape[1]))
    B_T=np.ascontiguousarray(B.T)

    for i in nb.prange(A.shape[0]):
        for j in range(B_T.shape[0]):
            acc=1.
            for k in range(A.shape[1]):
                acc*=(A[i,k] + B_T[j,k])
            C[i,j] = np.log(acc)

    return C

@nb.njit(fastmath=True, parallel=True)
def f_jdehesa(A, B):
    C = np.ones((A.shape[0], B.shape[1]), A.dtype)
    for i in nb.prange(A.shape[0]):
        for j in nb.prange(B.shape[1]):
            # Accumulate product
            for k in nb.prange(A.shape[1]):
                C[i,j] *= (A[i,k] + B[k,j])
    # Apply logarithm at the end
    return np.log(C)

Время

# Test
np.random.seed(0)
a, b = np.random.random((1000, 100)), np.random.random((100, 2000))

res_1=f_orig(a, b)
res_2=f_pre_opt(a, b)
res_3=f_jdehesa(a, b)

# True
%timeit f_orig(a, b)
#262 ms ± 1.76 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit f_pre_opt(a, b)
#12.4 ms ± 405 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit f_jdehesa(a, b)
#41 ms ± 2.24 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

Для больших матриц это решение далеко от оптимального. Для лучшего использования кэша необходимы дополнительные оптимизации, такие как вычисление результата по блокам.

Реализация матрично-матричного умножения в реальном мире

2 голосов
/ 11 февраля 2020

Начиная с log(a) + log(b) == log(a * b), вы можете сохранить множество вычислений логарифма, заменяя сложения умножением и выполняя логарифм только в конце, что должно сэкономить вам много времени.

import numpy as np
import numba as nb

@nb.njit(fastmath=True, parallel=True)
def f(A, B):
    C = np.ones((A.shape[0], B.shape[1]), A.dtype)
    for i in nb.prange(A.shape[0]):
        for j in nb.prange(B.shape[1]):
            # Accumulate product
            for k in nb.prange(A.shape[1]):
                C[i,j] *= (A[i,k] + B[k,j])
    # Apply logarithm at the end
    return np.log(C)

# For comparison
@nb.njit(fastmath=True, parallel=True)
def f_orig(A, B):
    C = np.zeros((A.shape[0], B.shape[1]), A.dtype)
    for i in nb.prange(A.shape[0]):
        for j in nb.prange(B.shape[1]):
            for k in nb.prange(A.shape[1]):
                C[i,j] += np.log(A[i,k] + B[k,j])
    return C

# Test
np.random.seed(0)
a, b = np.random.random((1000, 100)), np.random.random((100, 2000))
print(np.allclose(f(a, b), f_orig(a, b)))
# True
%timeit f(a, b)
# 36.2 ms ± 2.91 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit f_orig(a, b)
# 296 ms ± 3.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
...