Я пытаюсь реализовать скользящий минимум с амортизированной величиной O (1) get_min()
.Алгоритм амортизации O (1) исходит из принятого ответа в этом посте
Исходная функция:
import pandas as pd
import numpy as np
from numba import njit, prange
def rolling_min_original(data, n):
return pd.Series(data).rolling(n).min().to_numpy()
Моя попытка реализоватьАлгоритм амортизации O (1) get_min()
: (эта функция имеет приличную производительность для немалых n
)
@njit
def rollin_min(data, n):
"""
brief explanations:
param: stk2: the stack2 in the algorithm, except here it only stores the min stack
param: stk2_top: it starts at n-1, and drops gradually until it hits -1 then it comes backup to n-1
if stk2_top= 0 in the current iteration(it will become -1 at the end):
that means stk2_top is pointing at the bottom element in stk2,
after it drops to -1 from 0, in the next iteration, stk2 will be reassigned to a new array data[i-n+1:i+1],
because we need to include the current index.
at each iteration:
if stk2_top <0: (i.e. we have 0 stuff in stk2(aka stk2_top <0)
- copy the past n items(including the current one) to stk2, so that stk2 has n items now
- pick the top min from stk2(stk2_top = n-1 momentarily)
- move down the pointer by 1 after the operation(n-1 becomes n-2)
else: (i.e. we have j(1<=j<= n-1) stuff in stk2)
- pick the top min from stk2(stk2_top is j-1 momentarily)
- move down the pointer by 1 after the operation(j-1 becomes j-2)
"""
if n >1:
def min_getter_rev(arr1):
arr = arr1[::-1]
result = np.empty(len(arr), dtype = arr1.dtype)
result[0]= local_min = arr[0]
for i in range(1,len(arr)):
if arr[i] < local_min:
local_min = arr[i]
result[i] = local_min
return result
result_min= np.empty(len(data), dtype= data.dtype)
for i in prange(n-1):
result_min[i] =np.nan
stk2 = min_getter_rev(data[:n])
stk2_top = n-2#it is n-2 because the loop starts at n(not n-1)which is the second non nan term
stk1_min = data[n-1]#stk1 needs to be the first item of the stk1
result_min[n-1]= min(stk1_min, stk2[-1])
for i in range(n,len(data)):
if stk2_top >= 0:
if data[i] < stk1_min:
stk1_min= min(data[i], stk1_min)#the stk1 min
result_min[i] = min(stk1_min, stk2[stk2_top])#min of the top element in stk2 and the current element
else:
stk2 = min_getter_rev(data[i-n+1:i+1])
stk2_top= n-1
stk1_min = data[i]
result_min[i]= min(stk1_min, stk2[n-1])
stk2_top -= 1
return result_min
else:
return data
Наивная реализация, когда n
мала:
@njit(parallel= True)
def rolling_min_smalln(data, n):
result= np.empty(len(data), dtype= data.dtype)
for i in prange(n-1):
result[i]= np.nan
for i in prange(n-1, len(data)):
result[i]= data[i-n+1: i+1].min()
return result
Небольшой небольшой код для тестирования
def remove_nan(arr):
return arr[~np.isnan(arr)]
if __name__ == '__main__':
np.random.seed(0)
data_size = 200000
data = np.random.uniform(0,1000, size = data_size)+29000
w_size = 37
r_min_original= rolling_min_original(data, w_size)
rmin1 = rollin_min(data, w_size)
r_min_original = remove_nan(r_min_original)
rmin1 = remove_nan(rmin1)
print(np.array_equal(r_min_original,rmin1))
Функция rollin_min()
имеет почти постоянное время выполнения и меньшее время выполнения, чем rolling_min_original()
, когда n
большое, чтоотлично.Но он имеет низкую производительность, когда n
низок (около n < 37
на моем компьютере, в этом диапазоне rollin_min()
может быть легко побежден наивной реализацией rolling_min_smalln()
).
Я изо всех сил пытаюсь найти способы улучшить rollin_min()
, но пока я застрял, поэтому я ищу здесь помощь.
У меня следующие вопросы:
Является ли алгоритм, который я реализую, наилучшим образом для скользящего / скользящего окна мин / макс?
Если нет, то какой алгоритм лучше / лучше?Если да, то как я могу улучшить эту функцию с точки зрения алгоритма?
Помимо самого алгоритма, какие еще способы могут еще больше улучшить производительность функции rollin_min()
?
РЕДАКТИРОВАТЬ: мой последний ответ перенесен в раздел ответов на несколько запросов