Подсчет перекрытия между соседними индексами в массиве NumPy - PullRequest
2 голосов
/ 06 марта 2020

У меня есть NumPy массив целых чисел:

x = np.array([1, 0, 2, 1, 4, 1, 4, 1, 0, 1, 4, 3, 0, 1, 0, 2, 1, 4, 3, 1, 4, 1, 0])

и другой массив индексов, который ссылается на указанный выше массив:

indices = np.array([22, 12, 8, 1, 14, 21, 7, 0, 13, 19, 5, 3, 9, 16, 2, 15, 11, 18, 20, 6, 4, 10, 17])

Для каждой пары соседних индексов мы нужно посчитать, сколько последовательных значений в x перекрываются, начиная с каждого из двух соседних индексов. Например, для indices[2] и indices[3] у нас есть индексы 8 и 1 соответственно, и они обе имеют ссылочные позиции в x. Затем, начиная с x[8] и x[1], мы подсчитываем, сколько последовательных значений совпадают или перекрываются, но мы прекращаем проверять перекрытие при определенных условиях c (см. Ниже). Другими словами, мы проверяем:

  1. x[8] == x[1]
  2. x[9] == x[2] # увеличивает каждый индекс на один
  3. ... # продолжает увеличивать каждый индекс, кроме в следующих условиях
  4. остановится, если i >= x.shape[0]
  5. остановится, если j >= x.shape[0]

6. остановка, если x[i] == 0 7. остановить если x[j] == 0

останов, если x[i] != x[j]

В действительности, мы делаем это для всех соседних индексных пар:

out = np.zeros(indices.shape[0], dtype=int)
for idx in range(indices.shape[0]-1):
    count = 0
    i = indices[idx]
    j = indices[idx + 1]
    k = 0
    # while i+k < x.shape[0] and j+k < x.shape[0] and x[i+k] != 0 and x[j+k] != 0 and x[i+k] == x[j+k]:
    while i+k < x.shape[0] and j+k < x.shape[0] and x[i+k] == x[j+k]:
        count += 1
        k += 1
        out[idx] = k

И вывод:

# [0, 0, 0, 0, 0, 1, 1, 1, 1, 3, 3, 2, 3, 0, 3, 0, 1, 0, 2, 2, 1, 2, 0]  # This is the old output if x[i] == 0 and x[j] == 0 are included

[1 2 1 4 0 2 2 5 1 4 3 2 3 0 3 0 1 0 3 2 1 2 0]

Я ищу векторизованный способ сделать это в NumPy.

1 Ответ

0 голосов
/ 08 марта 2020

Это должно сработать (я игнорирую два условия x[i]=0 и x[j]=0)

for idx in range(indices.shape[0]-1):

    i = indices[idx]
    j = indices[idx + 1]

    l = len(x) - max(i,j)
    x1 = x[i:i+l]
    x2 = x[j:j+l]

    # Add False at the end to handle the case in which arrays are exactly the same
    x0 = np.append(x1==x2, False)

    out[idx] = np.argmin(x0)

Обратите внимание, что с np.argmin я использую следующие два факта:

  • False < True
  • np.argmin возвращает только первый экземпляр минимума в массиве

Анализ производительности

Что касается производительности по времени, я протестировано с N=10**5 и N=10**6, и, как предлагается в комментариях, это не может конкурировать с numba jit.

def f(x, indices):

    out = np.zeros(indices.shape[0], dtype=int)

    for idx in range(indices.shape[0]-1):

        i = indices[idx]
        j = indices[idx + 1]

        l = len(x) - max(i,j)
        x1 = x[i:i+l]
        x2 = x[j:j+l]

        x0 = np.append(x1==x2, False)

        out[idx] = np.argmin(x0)

    return out

N=100_000
x = np.random.randint(0,10, N)
indices = np.arange(0, N)
np.random.shuffle(indices)

%timeit f(x, indices)
3.67 s ± 122 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

N=1_000_000
x = np.random.randint(0,10, N)
indices = np.arange(0, N)
np.random.shuffle(indices)

%time f(x, indices)
Wall time: 8min 20s

(у меня не хватило терпения разрешить %timeit fini sh)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...