Во-первых, давайте сделаем вашу функцию немного более эффективной. Вы выполняете некоторые ненужные операции индексирования: вместо y_left[lcond].shape[0]
вам просто нужно lcond.sum()
или len(lcond.nonzero()[0])
, что кажется быстрее.
Вот улучшенная цикличная версия вашего кода (в комплекте с фиктивным вводом):
import numpy as np
n = 1000
X = np.random.randint(0,n,n)
Y = np.random.randint(0,n,n)
S = np.random.choice(n//2, n)
def f2(x, y, s):
"""Same loopy solution as original, only faster"""
n = x.size
isone = y == 1
lval = len(isone[:s].nonzero()[0])
rval = len(isone[s:].nonzero()[0])
hleft = 1 - (lval**2 + (s - lval)**2) / n**2
hright = 1 - (rval**2 + (n - s - rval)**2) / n**2
return - s / n * hleft - (n - s) / n * hright
def time_newloop():
"""Callable front-end for timing comparisons"""
results = np.empty(len(S))
for i in range(len(S)):
results[i] = f2(X, Y, S[i])
return results
Изменения довольно просты.
Теперь получается, что мы действительно можем векторизовать ваши циклы. Для этого мы должны сравнить, используя каждый элемент S
одновременно. Способ, которым мы можем сделать это, создает 2d маску формы (nS, n)
(где S.size == nS
), которая обрезает значения до соответствующего элемента S
. Вот как это сделать:
def f3(X, Y, S):
"""Vectorized solution working on all the data at the same time"""
n = X.size
leftmask = np.arange(n) < S[:,None] # boolean, shape (nS, n)
rightmask = ~leftmask # boolean, shape (nS, n)
isone = Y == 1 # shape (n,)
lval = (isone & leftmask).sum(axis=1) # shape (nS,)
rval = (isone & rightmask).sum(axis=1) # shape (nS,)
hleft = 1 - (lval**2 + (S - lval)**2) / n**2
hright = 1 - (rval**2 + (n - S - rval)**2) / n**2
return - S / n * hleft - (n - S) / n * hright # shape (nS,)
def time_vector():
"""Trivial front-end for fair timing"""
return f3(X,Y,S)
Определив исходное решение для запуска как time_orig()
, мы можем проверить, что результаты совпадают:
>>> np.array_equal(time_orig(), time_newloop()), np.array_equal(time_orig(), time_vector())
(True, True)
И время выполнения с вышеуказанными случайными входами:
>>> %timeit time_orig()
... %timeit time_newloop()
... %timeit time_vector()
...
...
19 ms ± 501 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
11.4 ms ± 214 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
3.93 ms ± 37.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Это означает, что вышеприведенная петлевая версия почти в два раза быстрее, чем оригинальная петлевая, а векторизованная версия - еще один фактор в три раза быстрее. Конечно, цена последнего улучшения заключается в увеличении потребности в памяти: вместо массивов (n,)
теперь у вас есть массивы (nS, n)
, которые могут стать довольно большими, если ваши входные массивы огромны. Но, как говорится, бесплатного обеда нет, с векторизацией вы часто тратите время на обмен памяти.