Как векторизовать эту 2D матричную операцию? - PullRequest
3 голосов
/ 08 октября 2019

У меня есть 2 массива логических масок, и я рассчитываю вычислить операцию для каждой комбинации двух масок.

Медленная версия

N = 10000
M = 580
masksA = np.array(np.random.randint(0,2, size=(N,M)), dtype=np.bool)
masksB = np.array(np.random.randint(0,2, size=(N,M)), dtype=np.bool)

result = np.zeros(shape=(N,N), dtype=np.float)
for i in range(N):
    for j in range(N):
        result[i,j] = np.float64(np.count_nonzero(masksB[i,:]==masksB[j,:])) / M

Более быстрая версия

for i in range(N):
    result[i,:] = np.array(np.count_nonzero(masksB[i]==masksA, axis=1), dtype=np.float64) / M

Можно ли сделать это еще быстрее с помощью одного цикла?

Ответы [ 2 ]

3 голосов
/ 08 октября 2019

Это в основном сравнение с внешним равенством, сохраняющее первую из этих осей маски выровненной. Мы можем использовать matrix-multiplication с идеей, что умножение матриц самих масок и их одновременных отрицательных версий приведет нас к этим внешним суммированиям равенства. Следовательно, для суммирования равенств просто выполните -

out = (~masksB).astype(int).dot(~masksA.T) + masksB.astype(int).dot(masksA.T)

В качестве альтернативы, получите версию int и используйте ее, чтобы получить также отрицательную версию -

mB = masksB.astype(int)
out = (1-mB).dot(~masksA.T) + mB.dot(masksA.T)

Окончательный выводбудет out/float(M). Или мы можем заменить эти int преобразования на float, а затем разделить вывод на M.

Синхронизация (с N вместо 1000) для получения суммирования равенства

# Setup
In [39]: np.random.seed(0)
    ...: N = 1000
    ...: M = 580
    ...: masksA = np.array(np.random.randint(0,2, size=(N,M)), dtype=np.bool)
    ...: masksB = np.array(np.random.randint(0,2, size=(N,M)), dtype=np.bool)

# Approach #1 with int dtype converison
In [40]: %timeit (~masksB).astype(int).dot(~masksA.T) + masksB.astype(int).dot(masksA.T)
1 loop, best of 3: 870 ms per loop

# Approach #1 with float dtype converison
In [41]: %timeit (~masksB).astype(float).dot(~masksA.T) + masksB.astype(float).dot(masksA.T)
10 loops, best of 3: 74 ms per loop

# Approach #2 with int dtype converison
In [42]: %%timeit
    ...: mB = masksB.astype(int)
    ...: out = (1-mB).dot(~masksA.T) + mB.dot(masksA.T)
1 loop, best of 3: 882 ms per loop

# Approach #2 with float dtype converison
In [43]: %%timeit
    ...: mB = masksB.astype(float)
    ...: out = (1-mB).dot(~masksA.T) + mB.dot(masksA.T)
10 loops, best of 3: 59.3 ms per loop

Таким образом, предпочтительным способом было бы использовать преобразование с плавающей точкой при втором подходеи разделите выходные данные на M.

Используйте одно умножение матриц со суммированием

Так как суммы по существу равны int числам, мы можем использоватьменьшая точность dtype для умножения матриц. Кроме того, еще одной идеей будет сложить исходные маски с их отрицательными версиями, а затем выполнить матричное умножение. Следовательно, для суммирования равенства мы бы имели -

m1 = np.hstack((masksA,~masksA))
m2 = np.hstack((masksB,~masksB))
out = m2.astype(np.float32).dot(m1.T)

Таймингс (такая же настройка, как и ранее) -

In [49]: %%timeit
    ...: m1 = np.hstack((masksA,~masksA))
    ...: m2 = np.hstack((masksB,~masksB))
    ...: out = m2.astype(np.float32).dot(m1.T)
10 loops, best of 3: 36.8 ms per loop
2 голосов
/ 08 октября 2019

Мы можем эффективно сократить лучшее время @ Дивакара (текущее) в два раза, используя только одно умножение матриц:

import numpy as np

N = 1000
M = 580
masksA = np.array(np.random.randint(0,2, size=(N,M)), dtype=np.bool)
masksB = np.array(np.random.randint(0,2, size=(N,M)), dtype=np.bool)

def f_pp():
    return np.where(masksB,1.0,-1.0)@np.where(masksA,0.5/M,-0.5/M).T+0.5

def f_pp_2():
    return (np.where(masksB,1.0,-1.0)@np.where(masksA,0.5,-0.5).T+0.5*M)*(1/M)

def f_dv_2():
    mB = masksB.astype(float)
    return ((1-mB).dot(~masksA.T) + mB.dot(masksA.T))*(1/M)

assert np.allclose(f_pp(),f_dv_2())
assert np.all(f_pp_2()==f_dv_2())

from timeit import timeit

print("PP     ",timeit(f_pp,number=100)*10,"ms")
print("PP 2   ",timeit(f_pp_2,number=100)*10,"ms")
print("Divakar",timeit(f_dv_2,number=100)*10,"ms")

Пример выполнения:

PP      31.41063162125647 ms
PP 2    31.757128546014428 ms
Divakar 63.400797033682466 ms

Как это работает? Сопоставляя значения True и False с противоположными числами x и -x (для двух факторов можно использовать разные x; скажем, a для масок A и b для масок B), мы получаем в точечном произведении aтермин ab для каждой позиции, где маски совпадают, и термин -ab для каждой позиции, где они разные. Если k - это число равных битов между векторами пары, то их скалярное произведение будет kab-(M-k)ab = 2kab - Mab. Выбор a и b таким, что 2Mab = 1 становится k/M - 1/2, что с точностью до постоянного смещения нашего желаемого результата.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...