Амортизированный минимум прокрутки O (1) реализован в Python Numba / NumPy - PullRequest
1 голос
/ 22 сентября 2019

Я пытаюсь реализовать скользящий минимум с амортизированной величиной O (1) get_min().Алгоритм амортизации O (1) исходит из принятого ответа в этом посте


Исходная функция:

import pandas as pd
import numpy as np
from numba import njit, prange

def rolling_min_original(data, n):
    return pd.Series(data).rolling(n).min().to_numpy()

Моя попытка реализоватьАлгоритм амортизации O (1) get_min(): (эта функция имеет приличную производительность для немалых n)

@njit
def rollin_min(data, n):
    """
        brief explanations:

        param: stk2: the stack2 in the algorithm, except here it only stores the min stack
        param: stk2_top: it starts at n-1, and drops gradually until it hits -1 then it comes backup to n-1
            if stk2_top= 0 in the current iteration(it will become -1 at the end):
                that means stk2_top is pointing at the bottom element in stk2, 
            after it drops to -1 from 0, in the next iteration, stk2 will  be reassigned to a new array data[i-n+1:i+1],
            because we need to include the current index. 

        at each iteration:
        if stk2_top <0: (i.e. we have 0 stuff in stk2(aka stk2_top <0)
            - copy the past n items(including the current one) to stk2, so that stk2 has n items now
            - pick the top min from stk2(stk2_top = n-1 momentarily)
            - move down the pointer by 1 after the operation(n-1 becomes n-2)

        else: (i.e. we have j(1<=j<= n-1) stuff in stk2)
            - pick the top min from stk2(stk2_top is j-1 momentarily)
            - move down the pointer by 1 after the operation(j-1 becomes j-2)

    """


    if n >1:  

        def min_getter_rev(arr1):
            arr = arr1[::-1]
            result = np.empty(len(arr), dtype = arr1.dtype)
            result[0]= local_min = arr[0]

            for i in range(1,len(arr)):
                if arr[i] < local_min:
                    local_min = arr[i]
                result[i] = local_min
            return result

        result_min= np.empty(len(data), dtype= data.dtype)
        for i in prange(n-1):
            result_min[i] =np.nan


        stk2 = min_getter_rev(data[:n])
        stk2_top = n-2#it is n-2 because the loop starts at n(not n-1)which is the second non nan term
        stk1_min = data[n-1]#stk1 needs to be the first item of the stk1
        result_min[n-1]= min(stk1_min, stk2[-1])    

        for i in range(n,len(data)):
            if stk2_top >= 0:
                if data[i] < stk1_min:
                    stk1_min= min(data[i], stk1_min)#the stk1 min
                result_min[i] = min(stk1_min, stk2[stk2_top])#min of the top element in stk2 and the current element

            else:
                stk2 = min_getter_rev(data[i-n+1:i+1])
                stk2_top= n-1
                stk1_min = data[i]
                result_min[i]= min(stk1_min, stk2[n-1])

            stk2_top -= 1

        return result_min   
    else:
        return data

Наивная реализация, когда n мала:

@njit(parallel= True)
def rolling_min_smalln(data, n):
    result= np.empty(len(data), dtype= data.dtype)

    for i in prange(n-1):
        result[i]= np.nan

    for i in prange(n-1, len(data)):
        result[i]= data[i-n+1: i+1].min()

    return result

Небольшой небольшой код для тестирования

def remove_nan(arr):
    return arr[~np.isnan(arr)]

if __name__ == '__main__':

    np.random.seed(0)
    data_size = 200000
    data = np.random.uniform(0,1000, size = data_size)+29000

    w_size = 37

    r_min_original= rolling_min_original(data, w_size)
    rmin1 = rollin_min(data, w_size)

    r_min_original = remove_nan(r_min_original)
    rmin1 = remove_nan(rmin1)

    print(np.array_equal(r_min_original,rmin1))

Функция rollin_min() имеет почти постоянное время выполнения и меньшее время выполнения, чем rolling_min_original(), когда n большое, чтоотлично.Но он имеет низкую производительность, когда n низок (около n < 37 на моем компьютере, в этом диапазоне rollin_min() может быть легко побежден наивной реализацией rolling_min_smalln()).

Я изо всех сил пытаюсь найти способы улучшить rollin_min(), но пока я застрял, поэтому я ищу здесь помощь.


У меня следующие вопросы:

Является ли алгоритм, который я реализую, наилучшим образом для скользящего / скользящего окна мин / макс?

Если нет, то какой алгоритм лучше / лучше?Если да, то как я могу улучшить эту функцию с точки зрения алгоритма?

Помимо самого алгоритма, какие еще способы могут еще больше улучшить производительность функции rollin_min()?


РЕДАКТИРОВАТЬ: мой последний ответ перенесен в раздел ответов на несколько запросов

Ответы [ 2 ]

3 голосов
/ 23 сентября 2019

Основной причиной медлительности в вашем коде, вероятно, является выделение нового массива в min_getter_rev.Вы должны повторно использовать одно и то же хранилище.

Тогда, поскольку вам не нужно реализовывать очередь, вы можете проводить больше оптимизаций.Например, размер двух стеков не больше (и обычно) n, поэтому вы можете хранить их в одном массиве с размером n.Вырастите один с начала и один с конца.

Вы заметите, что существует очень регулярный шаблон - заполняйте массив от начала до конца по порядку, пересчитывайте минимумы с конца, генерируйте выходные данные при пополнениимассив, повторите ...

Это приводит к действительно более простому алгоритму с более простым объяснением, которое вообще не относится к стекам.Вот реализация, с комментариями о том, как это работает.Обратите внимание, что я не стал начинать с NaNs:

def rollin_min(data, n):

    #allocate the result.  Note the number valid windows is len(data)-(n-1)
    result = np.empty(len(data)-(n-1), data.dtype)

    #every nth position is a "mark"
    #every window therefore contains exactly 1 mark
    #the minimum in the window is the minimum of:
    #  the minimum from the window start to the following mark; and
    #  the minimum from the window end the the preceding (same) mark

    #calculate the minimum from every window start index to the next mark
    for mark in range(n-1, len(data), n):
        v = data[mark]
        if (mark < len(result)):
            result[mark] = v
        for i in range(mark-1, mark-n, -1):
            v = min(data[i],v)
            if (i < len(result)):
                result[i] = v

    #for each window, calculate the running total from the preceding mark
    # to its end.  The first window ends at the first mark
    #then combine it with the first distance to get the window minimum

    nextMarkPos = 0
    for i in range(0,len(result)):
        if i == nextMarkPos:
             v = data[i+n-1]
             nextMarkPos += n
        else:
            v = min(data[i+n-1],v)
        result[i] = min(result[i],v)

    return result
1 голос
/ 28 сентября 2019

Перемещено это из раздела Вопрос РЕДАКТИРОВАТЬ сюда по нескольким запросам.

Вдохновленный более простой реализацией, предложенной Мэттом Тиммермансом в ответе, я сделал многоядерную версию процессора для скользящего минимума.Код выглядит следующим образом:

@njit(parallel= True)
def rollin_min2(data, n):
    """
    1) make a loop that iterates over K sections of n elements; each section is independent so that it can benefit from multicores cpu 
    2) for each iteration of sections, generate backward local minimum(sec_min2) and forward minimum(sec_min1)

    say m=8, len(data)= 23, then we only need the idx= (reversed to 7,6,5,...,1,0 (0 means minimum up until idx= 0)),
    1st iter
    result[7]= min_until 0,
    result[8]= min(min(data[7:9]) and min_until 1),
    result[9]= min(min(data[7:10]) and m_til 2)
    ...
    result[14]= min(min(data[7:15]) and m_til 7) 

    2nd iter
    result[15]= min_until 8,
    result[16]= min(min(data[15:17]) and m_til 9),
    result[17]= min(min(data[15:18]) and m_til 10)
    ...
    result[22]= min(min(data[15:23]) and m_til 15) 


    """
    ar_len= len(data)

    sec_min1= np.empty(ar_len, dtype = data.dtype)
    sec_min2= np.empty(ar_len, dtype = data.dtype)

    for i in prange(n-1):
        sec_min1[i]= np.nan

    for sec in prange(ar_len//n):
        s2_min= data[n*sec+ n-1]
        s1_min= data[n*sec+ n]

        for i in range(n-1,-1,-1):
            if data[n*sec+i] < s2_min:
                s2_min= data[n*sec+i]
            sec_min2[n*sec+i]= s2_min

        sec_min1[n*sec+ n-1]= sec_min2[n*sec]

        for i in range(n-1):
            if n*sec+n+i < ar_len:
                if data[n*sec+n+i] < s1_min:
                    s1_min= data[n*sec+n+i]
                sec_min1[n*sec+n+i]= min(s1_min, sec_min2[n*sec+i+1])

            else:
                break

    return sec_min1 

Я фактически потратил час на тестирование различных реализаций прокатки мин.В моем ноутбуке 6C / 12T эта многоядерная версия работает лучше всего, когда n «среднего размера».Когда n составляет не менее 30% длины исходных данных, другая реализация начинает затмевать.Должны быть еще лучшие способы улучшить эту функцию, но на момент этого редактирования я еще не знал о них.

...