РЕДАКТИРОВАТЬ: Чтобы исправить приведенный ниже код в соответствии с исправленным вопросом, требуется всего пара незначительных изменений в compute
:
def compute(m, n):
m = np.asarray(m)
n = np.asarray(n)
# Apply mask N in advance
m2 = m & n
# Pack booleans into uint8 for more efficient bitwise operations
# Also transpose for better caching (maybe?)
mb = np.packbits(m2.T, axis=1)
# Table with number of ones in each uint8
num_bits = (np.arange(256)[:, np.newaxis] & (1 << np.arange(8))).astype(bool).sum(1)
# Allocate output array
out = np.zeros((m2.shape[1], m2.shape[1]), np.int32)
# Do the counting with Numba
_compute_nb(mb, num_bits, out)
# Make output symmetric
out = out + out.T
# Add values in diagonal
out[np.diag_indices_from(out)] = m2.sum(0)
# Scale by number of ones in n
return out
Я бы сделал это с Numba, используя несколько трюков. Во-первых, вы можете выполнять только половину операций со столбцами, так как другая половина повторяется. Во-вторых, вы можете упаковать логические значения в байты, чтобы с каждым &
вы работали над восемью значениями вместо одного. В-третьих, вы можете использовать многопроцессорность для его распараллеливания. В целом, вы можете сделать это следующим образом:
import numpy as np
import numba as nb
def compute(m, n):
m = np.asarray(m)
n = np.asarray(n)
# Pack booleans into uint8 for more efficient bitwise operations
# Also transpose for better caching (maybe?)
mb = np.packbits(m.T, axis=1)
# Table with number of ones in each uint8
num_bits = (np.arange(256)[:, np.newaxis] & (1 << np.arange(8))).astype(bool).sum(1)
# Allocate output array
out = np.zeros((m.shape[1], m.shape[1]), np.int32)
# Do the counting with Numba
_compute_nb(mb, num_bits, out)
# Make output symmetric
out = out + out.T
# Add values in diagonal
out[np.diag_indices_from(out)] = m.sum(0)
# Scale by number of ones in n
out *= n.sum()
return out
@nb.njit(parallel=True)
def _compute_nb(mb, num_bits, out):
# Go through each pair of columns without repetitions
for i in nb.prange(mb.shape[0] - 1):
for j in nb.prange(1, mb.shape[0]):
# Count common bits
v = 0
for k in range(mb.shape[1]):
v += num_bits[mb[i, k] & mb[j, k]]
out[i, j] = v
# Test
m = np.array([[ True, True, False, True],
[False, True, True, True],
[False, False, False, False],
[False, True, False, False],
[ True, True, False, False]])
n = np.array([[ True],
[False],
[ True],
[ True],
[ True]])
out = compute(m, n)
print(out)
# [[ 8 8 0 4]
# [ 8 16 4 8]
# [ 0 4 4 4]
# [ 4 8 4 8]]
В качестве быстрого сравнения приведу небольшой тест по сравнению с оригинальными методами * oop и NumPy (только я уверен, что предложения Divakar - лучшее, что вы можете получить от NumPy):
import numpy as np
# Original loop
def compute_loop(m, n):
out = np.zeros((m.shape[1], m.shape[1]), np.int32)
for i in range(m.shape[1]):
for j in range(m.shape[1]):
result = m[:, i] & m[:, j]
out[i, j] = np.sum(result & n)
return out
# Divakar methods
def compute2(m, n):
return np.einsum('ij,ik,lm->jk', m, m.astype(int), n)
def compute3(m, n):
return np.einsum('ij,ik->jk',m, m.astype(int)) * n.sum()
def compute4(m, n):
return np.tensordot(m, m.astype(int),axes=((0,0))) * n.sum()
def compute5(m, n):
return m.T.dot(m.astype(int))*n.sum()
# Make random data
np.random.seed(0)
m = np.random.rand(1000, 100) > .5
n = np.random.rand(1000, 1) > .5
print(compute(m, n).shape)
# (100, 100)
%timeit compute(m, n)
# 768 µs ± 17.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit compute_loop(m, n)
# 11 s ± 1.23 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit compute2(m, n)
# 7.65 s ± 1.06 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit compute3(m, n)
# 23.5 ms ± 1.53 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit compute4(m, n)
# 8.96 ms ± 194 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit compute5(m, n)
# 8.35 ms ± 266 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)