Более быстрый способ найти те же самые числа в последовательности в массиве numpy - PullRequest
1 голос
/ 11 февраля 2020

Прямо сейчас я просто циклически использую np.nditer() и сравниваю с предыдущим элементом. Есть ли (векторизованный) подход, который быстрее?

Дополнительным бонусом является тот факт, что мне не всегда нужно go до конца массива; как только последовательность max_len найдена, я заканчиваю поиск.

import numpy as np

max_len = 3
streak = 0
prev = np.nan

a = np.array([0, 3, 4, 3, 0, 2, 2, 2, 0, 2, 1])

for c in np.nditer(a):
  if c == prev:
      streak += 1
      if streak == max_len:
          print(c)
          break
  else:
      prev = c
      streak = 1

Альтернатива, о которой я подумал, - это использование np.diff(), но это только снимает проблему; теперь мы ищем последовательность нулей в ее результате. Также я сомневаюсь, что это будет быстрее, так как ему придется вычислять разницу для каждого целого числа, тогда как на практике последовательность будет происходить до того, как достигнуть конца списка чаще, чем нет.

Ответы [ 4 ]

1 голос
/ 11 февраля 2020

Я разработал numpy -только версию, которая работает, но после тестирования я обнаружил, что она работает довольно плохо, потому что не может воспользоваться коротким замыканием . Так как это то, что вы просили, я опишу это ниже. Тем не менее, есть намного лучший подход, использующий numba со слегка измененной версией вашего кода. (Обратите внимание, что все они возвращают индекс первого совпадения в a, а не само значение. Я считаю этот подход более гибким.)

@numba.jit(nopython=True)
def find_reps_numba(a, max_len):
    streak = 1
    val = a[0]
    for i in range(1, len(a)):
        if a[i] == val:
            streak += 1
            if streak >= max_len:
                return i - max_len + 1
        else:
            streak = 1
            val = a[i]
    return -1

Это оказывается в ~ 100 раз быстрее, чем чистая версия Python.

Версия numpy использует трюк с скользящим окном и трюк argmax . Но опять же, оказывается, что это намного медленнее, чем даже в чистой версии Python, примерно в 30 раз.

def rolling_window(a, window):
    a = numpy.ascontiguousarray(a)  # This approach requires a C-ordered array
    shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
    strides = a.strides + (a.strides[-1],)
    return numpy.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

def find_reps_numpy(a, max_len):
    windows = rolling_window(a, max_len)
    return (windows == windows[:, 0:1]).sum(axis=1).argmax()

Я проверил оба этих варианта в сравнении с первой версией функции без сопряжения. (Для тестирования я использовал функцию %%timeit компании Jupyter.)

a = numpy.random.randint(0, 100, 1000000)

%%timeit
find_reps_numpy(a, 3)
28.6 ms ± 553 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
find_reps_orig(a, 3)
4.04 ms ± 40.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

%%timeit
find_reps_numba(a, 3)
8.29 µs ± 89.2 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

Обратите внимание, что эти цифры могут сильно различаться в зависимости от того, как глубоко в a функции должны искать. Чтобы лучше оценить ожидаемую производительность, мы можем каждый раз заново генерировать новый набор случайных чисел, но это трудно сделать без учета этого шага во времени. Поэтому для сравнения здесь я включаю время, необходимое для генерации случайного массива без выполнения чего-либо еще:

a = numpy.random.randint(0, 100, 1000000)
9.91 ms ± 129 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

a = numpy.random.randint(0, 100, 1000000)
find_reps_numpy(a, 3)
38.2 ms ± 453 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

a = numpy.random.randint(0, 100, 1000000)
find_reps_orig(a, 3)
13.7 ms ± 404 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

a = numpy.random.randint(0, 100, 1000000)
find_reps_numba(a, 3)
9.87 ms ± 124 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Как видите, find_reps_numba настолько быстр, что разница во времени, необходимом для запуска numpy.random.randint(0, 100, 1000000) намного больше - отсюда и иллюзорное ускорение между первым и последним тестами.

Итак, большая мораль этой истории в том, что решения numpy не всегда являются лучшими. Иногда даже чистый Python быстрее. В этих случаях numba в режиме nopython может быть лучшим вариантом на сегодняшний день.

1 голос
/ 11 февраля 2020

Если вы ищете элемент, который появляется по крайней мере max_len раз подряд, вот один способ NumPy -

m = np.r_[True,a[:-1]!=a[1:],True]
idx0 = np.flatnonzero(m)
m2 = np.diff(idx0)>=max_len
out = None # None for no such streak found case
if m2.any():
    out = a[idx0[m2.argmax()]]

Другой с binary-dilation -

from scipy.ndimage.morphology import binary_erosion

m = np.r_[False,a[:-1]==a[1:]]
m2 = binary_erosion(m, np.ones(max_len-1, dtype=bool))
out = None
if m2.any():
    out = a[m2.argmax()]

Наконец, для полноты вы также можете посмотреть на numba. Ваш существующий код будет работать как есть, с прямым циклом по a, то есть for c in a:.

1 голос
/ 11 февраля 2020

Вы можете создать подмассивы длиной max_length, каждый раз перемещая одну позицию вправо (например, нграмм ), и проверять, равна ли сумма одного sub_array, деленного на max_length, равной первый элемент этого подмассива.

Если это правда, то вы нашли последовательную последовательность целых чисел длины max_length.

def get_conseq(array, max_length):
    sub_arrays = zip(*[array[i:] for i in range(max_length)])
    for e in sub_arrays:
        if sum(e) / len(e) == e[0]:
            print("Found : {}".format(e))
            return e
    print("Nothing found")
    return []

Например, этот массив [1,2,2,3,4,5] с max_length = 2 будет 'split' как это: [1,2] [2,2] [2,3] [3,4] [4,5]

На втором элементе, [2,2], сумма равна 4, деленная на max_length дает 2, и который соответствует первому элементу этой подгруппы, и функция возвращает.

Вы можете break, если вы предпочитаете это делать, вместо того, чтобы возвращать, как я.

Вы также можете добавить несколько правил для захвата крайних случаев , чтобы сделать вещи чистыми (пустой массив, max_length превосходит длину массива и т. д. c).

Вот несколько примеров звонки:

>>> splits([1,2,3,4,5,6], 2)
Nothing found

>>> splits([1,2,2,3,4,5,6], 3)
Nothing found

>>> splits([1,2,3,3,3], 3)
Found : [3, 3, 3]

>>> splits([1,2,2,3,3], 2)
Found : [2, 2]

Надеюсь, это поможет!

1 голос
/ 11 февраля 2020

Вы можете использовать groupby из пакета itertools.

import numpy as np
from itertools import groupby

max_len = 3
best = ()

a = np.array([0, 3, 4, 3, 0, 2, 2, 2, 0, 2, 1])

for k, g in groupby(a):
    tup_g = tuple(g)
    if tup_g==max_len:
        best = tup_g
        break
    if len(tup_g) > len(best):
        best = tup_g

best
# returns:
(2, 2, 2)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...