Ускорение поиска следующего индекса в другом списке индексов (numpy) - PullRequest
0 голосов
/ 05 июня 2019

Приведенный ниже код работает должным образом, но не выглядит оптимизированным из-за цикла. Мне удалось успешно векторизовать все мои другие методы, но я не могу понять, как удалить цикл на этом.

Speedwise: это становится проблемой, когда у меня миллионы строк.

Есть ли способ векторизовать это или я должен попробовать Cython или Numba? Я пытался ограничить количество используемых пакетов.

Пример кода:

import numpy as np

leading = np.array([814, 935, 1057, 3069])

within = np.array([193, 207, 243, 251, 273, 286, 405, 427, 696, 770, 883,
                   896, 1004, 2014, 2032, 2033, 2046, 2066, 2079, 2154])

# find first following elements in within array
first_after_leading = []
for _ in leading:
    temp = (within - _).max()

    first_after_leading.append(temp)

# convert to np array
first_after_leading = np.array(first_after_leading)

Ответы [ 3 ]

3 голосов
/ 05 июня 2019

Максимальное вычитание из каждого элемента в leading против всех элементов в within будет вычитанием leading из максимального значения within. Следовательно, просто сделайте -

within.max() - leading

Никаких дополнительных модулей не требуется.

Сроки -

In [79]: np.random.seed(0)
    ...: within = np.random.rand(1000000)
    ...: leading = np.random.rand(400000)

In [80]: %timeit within.max() - leading
1000 loops, best of 3: 850 µs per loop
1 голос
/ 05 июня 2019

С помощью numba вы можете сделать довольно простой перевод вашего кода:

import numba as nb
import numpy as np

def find_leading(leading, within):
    # find first following elements in within array
    first_after_leading = []
    for _ in leading:
        temp = (within - _).max()

        first_after_leading.append(temp)

    # convert to np array
    first_after_leading = np.array(first_after_leading)

    return first_after_leading


@nb.jit(nopython=True)
def find_leading_nb(leading, within):
    # find first following elements in within array
    first_after_leading = np.empty_like(leading)
    for i, _ in enumerate(leading):
        temp = (within - _).max()

        first_after_leading[i] = temp

    return first_after_leading

И затем с вашим исходным вводом:

%timeit find_leading(leading, within)
%timeit find_leading_nb(leading, within)
%timeit (within[:,None] - leading).max(0)
17.3 µs ± 169 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
1.7 µs ± 25.3 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
6.48 µs ± 180 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

, а затем с некоторыми большими массивами:

leading = np.random.randint(0, 100, (1000,))
within = np.random.randint(0, 100, (100000,))

%timeit find_leading(leading, within)
%timeit find_leading_nb(leading, within)
%timeit (within[:,None] - leading).max(0)
145 ms ± 3.82 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
67.4 ms ± 218 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
553 ms ± 4.42 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Времена запускаются с numba 0.44 и numpy 1.16.4 на MacOS Python 3.7

EDIT

Но если я правильно понимаю ваш алгоритм,гораздо более быстрый подход состоит в том, чтобы найти максимум within только один раз, а затем взять разницу с leading, поэтому вам не нужно находить max временного массива в цикле:

@nb.jit(nopython=True)
def find_leading_nb2(leading, within):
    max_within = within.max()
    first_after_leading = np.empty_like(leading)
    for i, x in enumerate(leading):
        first_after_leading[i] = max_within - x
    return first_after_leading

Что дает на исходных входах следующее:

%timeit find_leading_nb2(leading, within)
919 ns ± 8.8 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

и на больших входах следующее:

%timeit find_leading_nb2(leading, within)
21.6 µs ± 180 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
0 голосов
/ 05 июня 2019

Полагаю, что сделать его одним лайнером поможет.Попробуй.

first_after_leading =np.array([(within - _).max() for _ in leading])
...