Вычисление немного другого умножения матриц - PullRequest
1 голос
/ 05 июня 2019

Я пытаюсь найти лучший способ для вычисления минимальных поэлементных произведений между двумя наборами векторов. Обычное матричное умножение C=A@B вычисляет Cij как сумму попарных произведений элементов векторов Ai и B^Tj. Я хотел бы выполнить вместо этого минимум парных продуктов. Я не могу найти эффективный способ сделать это между двумя матрицами с NumPy.

Один из способов добиться этого - сгенерировать трехмерную матрицу парных произведений между A и B (до суммы), а затем взять минимум над третьим измерением. Но это привело бы к огромному следу памяти (и я фактически не знаю, как сделать это).

У вас есть идеи, как я мог бы выполнить эту операцию?

Пример:

A = [[1,1],[1,1]]
B = [[0,2],[2,1]]

матричная матрица:

C = [[1*0+1*2,1*2+1*1][1*0+1*2,1*2+1*1]] = [[2,3],[2,3]]

минимальное значение:

C = [[min(1*0,1*2),min(1*2,1*1)][min(1*0,1*2),min(1*2,1*1)]] = [[0,1],[0,1]]

Ответы [ 2 ]

1 голос
/ 05 июня 2019

Numba также может быть опцией

Я был немного удивлен не очень хорошими временами Numexpr, поэтому я попробовал версию Numba. Для больших массивов это можно оптимизировать дальше. (Могут применяться те же принципы, что и для dgemm)

import numpy as np
import numba as nb
import numexpr as ne

@nb.njit(fastmath=True,parallel=True)
def min_pairwise_prod(A,B):
    assert A.shape[1]==B.shape[1]
    res=np.empty((A.shape[0],B.shape[0]))

    for i in nb.prange(A.shape[0]):
        for j in range(B.shape[0]):
            min_prod=A[i,0]*B[j,0]
            for k in range(B.shape[1]):
                prod=A[i,k]*B[j,k]
                if prod<min_prod:
                    min_prod=prod
            res[i,j]=min_prod
    return res

Задержка

A=np.random.rand(300,300)
B=np.random.rand(300,300)

%timeit res_1=min_pairwise_prod(A,B) #parallel=True
5.56 ms ± 1.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res_1=min_pairwise_prod(A,B) #parallel=False
26 ms ± 163 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit res_2 = ne.evaluate('min(A3D*B,2)',{'A3D':A[:,None]})
87.7 ms ± 265 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit res_3=np.min(A[:,None]*B,axis=2)
110 ms ± 214 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

A=np.random.rand(1000,300)
B=np.random.rand(1000,300)
%timeit res_1=min_pairwise_prod(A,B) #parallel=True
50.6 ms ± 401 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res_1=min_pairwise_prod(A,B) #parallel=False
296 ms ± 5.02 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res_2 = ne.evaluate('min(A3D*B,2)',{'A3D':A[:,None]})
992 ms ± 7.59 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res_3=np.min(A[:,None]*B,axis=2)
1.27 s ± 15.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1 голос
/ 05 июня 2019

Используйте broadcasting после расширения A до 3D -

A = np.asarray(A)
B = np.asarray(B)
C_out = np.min(A[:,None]*B,axis=2)

Если вам не хватает памяти, используйте numexpr module чтобы быть эффективными в этом -

import numexpr as ne

C_out = ne.evaluate('min(A3D*B,2)',{'A3D':A[:,None]})

Времена на больших массивах -

In [12]: A = np.random.rand(200,200)

In [13]: B = np.random.rand(200,200)

In [14]: %timeit np.min(A[:,None]*B,axis=2)
34.4 ms ± 614 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [15]: %timeit ne.evaluate('min(A3D*B,2)',{'A3D':A[:,None]})
29.3 ms ± 316 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [16]: A = np.random.rand(300,300)

In [17]: B = np.random.rand(300,300)

In [18]: %timeit np.min(A[:,None]*B,axis=2)
113 ms ± 2.27 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [19]: %timeit ne.evaluate('min(A3D*B,2)',{'A3D':A[:,None]})
102 ms ± 691 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

Итак, с numexpr есть некоторое улучшение, но, возможно, не так много, как я ожидал.

...