Многократный поиск подстроки в массиве numpy с несколькими уникальными значениями - PullRequest
1 голос
/ 02 февраля 2020

Вдохновленный этим вопросом: предположим, что у меня есть список из нескольких 1D numpy массивов xs, и я хотел бы знать, сколько из них встречается как "подстроки" другого большего 1D numpy массив y.

Мы можем предположить, что массивы содержат целые числа и что a является подстрокой b, если a == b[p:q] для некоторых целых чисел p и q.

Мое предлагаемое решение использует оператор in объекта Python bytes, но я полагаю, что он неэффективен, если xs имеет много элементов:

import numpy as np

N = 10_000    # number of arrays to search
M = 3         # "alphabet" size 
K = 1_000_000 # size of the target array

xs = [np.random.randint(0, M, size=7) for _ in range(N)]
y = np.random.randint(0, M, size=K)

y_bytes = y.tobytes()
%time num_matches = sum(1 for x in xs if x.tobytes() in y_bytes)
# CPU times: user 1.03 s, sys: 17 µs, total: 1.03 s
# Wall time: 1.03 s

Если M велико (число возможных значения в y любого из xs s велики, тогда я думаю, что мало что можно сделать, чтобы ускорить это. Тем не менее, для небольших M я представляю, что использование tr ie или чего-то подобного может быть полезным. Есть ли эффективный способ реализовать это в Python, возможно, используя numpy / numba?

1 Ответ

1 голос
/ 20 февраля 2020

Для маленьких M's мы можем присвоить каждому из xs уникальную метку на основе комбинации целых чисел в ней. Для того же самого мы можем использовать свертку с массивом scaling и, следовательно, уменьшить каждый из xs до скаляра. Наконец, мы используем метод сопоставления для обнаружения и, следовательно, подсчета.

Единственным дополнительным ресурсом будет преобразование в массив из списка массивов. Так что, если создание списка заранее оптимизировано для получения массива, это очень помогло бы для конечных показателей производительности.

Реализация будет выглядеть примерно так -

x = np.asarray(xs) # convert to array, if not already done

s = M**np.arange(x.shape[1])
yr = np.convolve(y,s[::-1])
xr = x.dot(s)

# Final step : Match and get count
N = np.maximum(xr.max(),yr.max())+1 # or use s[-1]*M if M is small enough
l = np.zeros(N, dtype=bool)
l[yr] = True
count = l[xr].sum()

Варианты выполнения Final step

Вариант № 1:

sidx = yr.argsort()
idx = np.searchsorted(yr,xr,sorter=sidx)
idx[idx==len(yr)] = 0
count = (yr[sidx[idx]] == xr).sum()

Вариант № 2:

from scipy.sparse import csr_matrix

ly = len(yr)
y_extent = yr.max()+1  # or use s[-1]*M if M is small enough
r = np.zeros(ly, dtype=np.uint64)
val = np.ones(ly, dtype=np.bool)
sp_mat = csr_matrix((val, (r,yr)), shape=(1,y_extent))
count = sp_mat[:,xr].sum()

Вариант № 3:

Для большего M числа мы можем использовать empty массивы вместо -

l = np.empty(N, dtype=bool)
l[xr] = False
l[yr] = True
count = l[xr].sum()

Копать дальше (Усиливая numba на convolution)

Профилирование основного предложенного решения показывает, что сверточная часть 1D была трудоемкой. Далее, мы видим, что сверточный код 1D имеет специфицированное ядро ​​c, которое по своей природе имеет геометрию c. Это может быть реализовано в O(n) после повторного использования граничных элементов на итерацию. Обратите внимание, что это будет в основном инвертированное ядро ​​по сравнению с предложенным ранее. Итак, добавив все эти изменения, мы получим что-то вроде этого -

from numba import njit

@njit
def numba1(y, conv_out, M, L, N):
    A = M**L
    for i in range(1,N):
        conv_out[i] = conv_out[i-1]*M + y[i+L-1] - y[i-1]*A
    return conv_out

def numba_convolve(y, M, L):
        N = len(y)-L+1
        conv_out = np.empty(N, dtype=int)
        conv_out[0] = y[:L].dot(M**np.arange(L-1,-1,-1))
        return numba1(y, conv_out, M, L, N)

def intersection_count(xs, y):
    x = np.asarray(xs) # convert to array, if not already done

    L = x.shape[1]
    s = M**np.arange(L-1,-1,-1)
    xr = x.dot(s)

    yr_numba = numba_convolve(y, M=M, L=L)

    # Final step : Match and get count
    N = s[0]*M
    l = np.zeros(N, dtype=bool)
    l[yr_numba] = True
    count = l[xr].sum()
    return count

Бенчмаркинг

Мы будем повторно использовать настройки из вопроса.

In [42]: %%timeit
    ...: y_bytes = y.tobytes()
    ...: p = sum(1 for x in xs if x.tobytes() in y_bytes)
927 ms ± 3.57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [43]: %timeit intersection_count(xs, y)
7.55 ms ± 71.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Как отмечалось ранее, узким местом может быть преобразование в массив. Итак, давайте рассмотрим и эту часть -

In [44]: %timeit np.asarray(xs)
3.41 ms ± 10.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Итак, часть преобразования массива составляет около 45% общего времени выполнения, и это существенная часть. Следовательно, предложение работать с двумерным массивом в отличие от списка одномерных массивов становится критически важным в этот момент. Положительным моментом является то, что данные массива дают нам возможность векторизации и, следовательно, улучшают общую производительность. Просто чтобы подчеркнуть доступность 2D массива, вот ускорения с и без -

In [45]: 927/7.55
Out[45]: 122.78145695364239

In [46]: 927/(7.55-3.41)
Out[46]: 223.91304347826087
...