Для маленьких M's
мы можем присвоить каждому из xs
уникальную метку на основе комбинации целых чисел в ней. Для того же самого мы можем использовать свертку с массивом scaling и, следовательно, уменьшить каждый из xs
до скаляра. Наконец, мы используем метод сопоставления для обнаружения и, следовательно, подсчета.
Единственным дополнительным ресурсом будет преобразование в массив из списка массивов. Так что, если создание списка заранее оптимизировано для получения массива, это очень помогло бы для конечных показателей производительности.
Реализация будет выглядеть примерно так -
x = np.asarray(xs) # convert to array, if not already done
s = M**np.arange(x.shape[1])
yr = np.convolve(y,s[::-1])
xr = x.dot(s)
# Final step : Match and get count
N = np.maximum(xr.max(),yr.max())+1 # or use s[-1]*M if M is small enough
l = np.zeros(N, dtype=bool)
l[yr] = True
count = l[xr].sum()
Варианты выполнения Final step
Вариант № 1:
sidx = yr.argsort()
idx = np.searchsorted(yr,xr,sorter=sidx)
idx[idx==len(yr)] = 0
count = (yr[sidx[idx]] == xr).sum()
Вариант № 2:
from scipy.sparse import csr_matrix
ly = len(yr)
y_extent = yr.max()+1 # or use s[-1]*M if M is small enough
r = np.zeros(ly, dtype=np.uint64)
val = np.ones(ly, dtype=np.bool)
sp_mat = csr_matrix((val, (r,yr)), shape=(1,y_extent))
count = sp_mat[:,xr].sum()
Вариант № 3:
Для большего M
числа мы можем использовать empty
массивы вместо -
l = np.empty(N, dtype=bool)
l[xr] = False
l[yr] = True
count = l[xr].sum()
Копать дальше (Усиливая numba
на convolution
)
Профилирование основного предложенного решения показывает, что сверточная часть 1D
была трудоемкой. Далее, мы видим, что сверточный код 1D
имеет специфицированное ядро c, которое по своей природе имеет геометрию c. Это может быть реализовано в O(n)
после повторного использования граничных элементов на итерацию. Обратите внимание, что это будет в основном инвертированное ядро по сравнению с предложенным ранее. Итак, добавив все эти изменения, мы получим что-то вроде этого -
from numba import njit
@njit
def numba1(y, conv_out, M, L, N):
A = M**L
for i in range(1,N):
conv_out[i] = conv_out[i-1]*M + y[i+L-1] - y[i-1]*A
return conv_out
def numba_convolve(y, M, L):
N = len(y)-L+1
conv_out = np.empty(N, dtype=int)
conv_out[0] = y[:L].dot(M**np.arange(L-1,-1,-1))
return numba1(y, conv_out, M, L, N)
def intersection_count(xs, y):
x = np.asarray(xs) # convert to array, if not already done
L = x.shape[1]
s = M**np.arange(L-1,-1,-1)
xr = x.dot(s)
yr_numba = numba_convolve(y, M=M, L=L)
# Final step : Match and get count
N = s[0]*M
l = np.zeros(N, dtype=bool)
l[yr_numba] = True
count = l[xr].sum()
return count
Бенчмаркинг
Мы будем повторно использовать настройки из вопроса.
In [42]: %%timeit
...: y_bytes = y.tobytes()
...: p = sum(1 for x in xs if x.tobytes() in y_bytes)
927 ms ± 3.57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [43]: %timeit intersection_count(xs, y)
7.55 ms ± 71.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Как отмечалось ранее, узким местом может быть преобразование в массив. Итак, давайте рассмотрим и эту часть -
In [44]: %timeit np.asarray(xs)
3.41 ms ± 10.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Итак, часть преобразования массива составляет около 45%
общего времени выполнения, и это существенная часть. Следовательно, предложение работать с двумерным массивом в отличие от списка одномерных массивов становится критически важным в этот момент. Положительным моментом является то, что данные массива дают нам возможность векторизации и, следовательно, улучшают общую производительность. Просто чтобы подчеркнуть доступность 2D массива, вот ускорения с и без -
In [45]: 927/7.55
Out[45]: 122.78145695364239
In [46]: 927/(7.55-3.41)
Out[46]: 223.91304347826087