Из 1 миллиона предметов, предшествующих A [i], сколько из них меньше, чем A [i]? - PullRequest
0 голосов
/ 07 декабря 2018

Пусть A будет массивом 1D размером от 5 до 20 миллионов.

Я бы хотел определить для каждого i, сколько элементов среди A[i-1000000], A[i-999999], ..., A[i-2], A[i-1] меньше, чем A[i].

Сказано по-другому: я ищу пропорцию предметов, меньших A[i], в окне из миллиона предметов, предшествующем A[i].

Я протестировал различные подходы, и было дано несколько ответов в Скользящее сравнение между значением и прошлым окном с процентилем / квантилем :

import numpy as np
A = np.random.random(5*1000*1000)
n = 1000*1000
B = (np.lib.stride_tricks.as_strided(A, shape=(n,A.size-n), strides=(A.itemsize,A.itemsize)) <= A[n:]).sum(0) 
#or similar version with "view_as_windows(A, n)"

Наконец, самым быстрым решением было какое-то наивноеcode + numba:

from numba import jit, prange
@jit(parallel=True)
    def doit(A, n):
        Q = np.zeros(len(A))
        for i in prange(n, len(Q)):
            Q[i] = np.sum(A[i-n:i] <= A[i])
        return(Q)
C = doit(A, n)

Но даже с этим кодом для меня это слишком медленно с длиной A 5 миллионов и n = 1 миллион: около 30 минут, чтобы выполнить это вычисление!

Есть ли более умная идея, чтобы избегал повторного сравнения 1 миллиона элементов для каждого элемента выходных данных?

Примечание: имея приблизительную пропорциюс точностью 10 ^ (- 3), например, «~ 34,3% из 1 миллиона предыдущих предметов меньше, чем A [i]"было бы достаточно.

Ответы [ 6 ]

0 голосов
/ 17 мая 2019

Вот приблизительный подход, который прост в реализации и отвечает за время O (n): (21 секунда для значений 5M на моем ноутбуке).Он должен хорошо работать для наборов данных со значениями, которые отличаются более чем на 1/1000 от наибольшей разницы.

from collections import deque,Counter
def lessCount(A,window):
    precision = 1000 # 1/1000 th of value range
    result    = deque()
    counts    = [0]*(precision+1)
    minVal    = min(A)
    chunkSize = (max(A)-minVal)/precision
    keys      = deque()
    for i,a in enumerate(A):
        key = int((a-minVal)/chunkSize)
        keys.append(key)
        counts[key] += 1       
        lowerCount   = sum(counts[:key])
        result.append(lowerCount)
        if i < window: continue
        counts[keys.popleft()] -= 1
    return np.array(result)

Создает скользящий массив отсчетов, где индекс представляет собой относительную позицию значения, разделенного на куски.Размер порции составляет 1/1000 от наибольшей разницы между значениями.Для каждого элемента в A есть только одно сложение и одно вычитание в массив счетчиков.Число значений ниже текущего равно сумме отсчетов до позиции этого значения в массиве подсчетов.Вы можете увеличить точность по мере необходимости, но имейте в виду, что время будет пропорционально O (n) * точность

0 голосов
/ 15 декабря 2018

Вот пифранизированная версия моего решения.Это примерно в два раза быстрее, и я думаю, что более читабельным, даже если это дольше.Очевидным недостатком является добавленная зависимость от Pythran.

Основная рабочая лошадка - _mergsorted3, она хорошо масштабируется с увеличением размера блока, но сравнительно медленно на небольшом размере блока.

Я написал одного специалиста для размера блока1, чтобы продемонстрировать, насколько большую скорость можно потенциально набрать.

import numpy as np
from _mergesorted2 import _mergesorted_1
from _mergesorted3 import _mergesorted3
from time import perf_counter as pc

USE_SPEC_1 = True
def rolling_count_smaller(D, n, countequal=True):
    N = len(D)
    B = n.bit_length() - 1
    # now: 2^(B+1) >= n > 2^B
    # result and sorter
    R, S = np.zeros(N, int), np.empty(N, int) if USE_SPEC_1 else np.arange(N)
    FL, FH, SL, SH = (np.zeros(3, dt) for dt in 'llll')
    T = pc()
    if USE_SPEC_1:
        _mergesorted_1(D, R, S, n, countequal)
    for b in range(USE_SPEC_1, B):
        print(b, pc()-T)
        T = pc()
        # for each odd block first treat the elements that are so far to its
        # right that they can see that block in full but not the block
        # containing it
        # most of the time (whenever 2^b does not divide n+1) these will  span
        # two blocks, hence fall into two ordered subgroups
        # thus do a threeway merge, but only a "dry run":
        # update the counts R but not the sorter S
        L, BB = n+1, ((n>>b)+1)<<b
        if L == BB:
            Kref = int(countequal)
            SL[1-countequal] = BB
            SH[1-countequal] = BB+(1<<b)
            FL[1-countequal] = BB
            FH[1-countequal] = n+1+(1<<b)
            SL[2] = SH[2] = FL[2] = FH[2] = 0
        else:
            Kref = countequal<<1
            SL[1-countequal:3-countequal] = BB-(1<<b), BB
            SH[1-countequal:3-countequal] = BB, BB+(1<<b)
            FL[1-countequal:3-countequal] = L, BB
            FH[1-countequal:3-countequal] = BB, n+1+(1<<b)
        SL[Kref] = FL[Kref] = 1<<b
        SH[Kref] = FH[Kref] = 1<<(b+1)
        _mergesorted3(D, R, S, SL, SH, FL, FH, N, 1<<(b+1), Kref, False, True)
        # merge pairs of adjacent blocks        
        SL[...] = 0
        SL[1-countequal] = 1<<b
        SH[2] = 0
        SH[:2] = SL[:2] + (1<<b)
        _mergesorted3(D, R, S, SL, SH, FL, FH, N, 1<<(b+1), int(countequal), True, False) 
    # in this last step even and odd blocks are treated the same because
    # neither can be contained in larger valid block
    SL[...] = 0
    SL[1-countequal] = 1<<B
    SH[2] = 0
    SH[int(countequal)] = 1<<B
    SH[1-countequal] = 1<<(B+1)
    FL[...] = 0
    FL[1-countequal] = 1<<B
    FH[2] = 0
    FH[int(countequal)] = 1<<B
    FH[1-countequal] = n+1
    _mergesorted3(D, R, S, SL, SH, FL, FH, N, 1<<B, int(countequal), False, True)
    return R

countequal=True
l = 1_000_000
np.random.seed(0)
data = np.random.randint(-99, 100, (5*l,))
from time import perf_counter as pc
t = pc()
x = rolling_count_smaller(data, l, countequal)
t = pc() - t
print(f'size {data.size:_d}, lookback {l:_d} -- took {t:.3f} seconds')
check = 10
sample = np.random.randint(0, len(x), check)

if countequal:
    y = np.array([np.count_nonzero(data[max(0, i-l):i]<=data[i]) for i in sample])
else:
    y = np.array([np.count_nonzero(data[max(0, i-l):i]<data[i]) for i in sample])
assert np.all(y==x[sample])
print(f'seems correct -- {check:_d} samples checked')    

Основной рабочий _mergesorted3.py.Компиляция: pythran _mergesorted3.py

import numpy as np

#pythran export _mergesorted3(float[:], int[:], int[:], int[3], int[3], int[3], int[3], int, int, int, bool, bool)
#pythran export _mergesorted3(int[:], int[:], int[:], int[3], int[3], int[3], int[3], int, int, int, bool, bool)


# DB, RB, SB are the data, result and sorter arrays; here they are treated a
# bit like base pointers, hence the B in the names
# SL, SH are the low and high ends of the current rows of the three queues
# the next rows are assumed to be at offset N
# FL, FH are low and high ends of ranges in non sorted order used to filter
# each queue. they are ignored if 'filter' is False
# ST is the top index this can fall in the middle of a row which will then be
# processed partially
# Kref is the index of the referenve queue (the one whose elements are counted) 
def _mergesorted3(DB, RB, SB, SL, SH, FL, FH, ST, N, Kref, writeback, filter):
    if writeback: # set up row buffer for writing back of merged sort order
        SLbuf = min(SL[0], SL[1]) # low end of row
        SHbuf = max(SH[0], SH[1]) # high end of row
        Sbuf = np.empty(SHbuf-SLbuf, int) # buffer
        Ibuf = 0 # index
    D = np.empty(3, DB.dtype) # heads of the three queues. values
    S = np.empty(3, int) # heads the three queues. sorters
    while True: # loop over rows
        C = 0 # count of elements in the reference block seen so far
        I = SL.copy() # heads of the three queses. indices
        S[:2] = SB[I[:2]] # the inner loop expects the heads of the two non
                          # active (i.e. not incremented just now) queues
                          # to be in descending order
        if filter: # skip elements that are not within a contiguous range.
                   # this requires filtering because everything is referenced
                   # in sorted order. so we cannot directly select ranges in
                   # the original order
                   # it is the caller's responsibility that for all except
                   # possibly the last row the filtered queues are not empty
            for KK in range(2):
                while S[KK] < FL[KK] or S[KK] >= FH[KK]:
                    I[KK] += 1
                    S[KK] = SB[I[KK]]
        D[:2] = DB[S[:2]] # fetch the first two queue head values
        # and set the inter queue sorter accordingly
        K = np.array([1, 0, 2], int) if D[1] > D[0] else np.array([0, 1, 2], int)
        while I[K[2]] < SH[K[2]]: # loop to merge three rows
            # get a valid new elment from the active queue at sorter level
            S[K[2]] = SB[I[K[2]]]
            if filter and (S[K[2]] < FL[K[2]] or S[K[2]] >= FH[K[2]]):
                I[K[2]] += 1
                continue
            # fetch the corresponding value
            D[K[2]] = DB[S[K[2]]]
            # re-establish inter-queue sort order
            if D[K[2]] > D[K[1]] or (D[K[2]] == D[K[1]] and K[2] < K[1]):
                K[2], K[1] = K[1], K[2]
                if D[K[1]] > D[K[0]] or (D[K[1]] == D[K[0]] and K[1] < K[0]):
                    K[1], K[0] = K[0], K[1]
            # do the book keeping depending on which queue has become active
            if K[2] == Kref: # reference queue: adjust counter
                C += 1
            else: # other: add current ref element count to head of result queue
                RB[S[K[2]]] += C
            I[K[2]] += 1 # advance active queue
        # one queue has been exhausted, which one?
        if K[2] == Kref: # reference queue: no need to sort what's left just 
                         # add the current ref element count to all leftovers
                         # subject to filtering if applicable
            if filter:
                KK = SB[I[K[1]]:SH[K[1]]]
                RB[KK[(KK >= FL[K[1]]) & (KK < FH[K[1]])]] += C
                KK = SB[I[K[0]]:SH[K[0]]]
                RB[KK[(KK >= FL[K[0]]) & (KK < FH[K[0]])]] += C
            else:
                RB[SB[I[K[1]]:SH[K[1]]]] += C
                RB[SB[I[K[0]]:SH[K[0]]]] += C
        else: # one of the other queues: we are left with a two-way merge
              # this is in a separate loop because it also supports writing
              # back the new sort order which we do not need in the three way
              # situation
            while I[K[1]] < SH[K[1]]:
                S[K[1]] = SB[I[K[1]]]
                if filter and (S[K[1]] < FL[K[1]] or S[K[1]] >= FH[K[1]]):
                    I[K[1]] += 1
                    continue
                D[K[1]] = DB[S[K[1]]]
                if D[K[1]] > D[K[0]] or (D[K[1]] == D[K[0]] and K[1] < K[0]):
                    K[1], K[0] = K[0], K[1]
                if K[1] == Kref:
                    C += 1
                else:
                    RB[S[K[1]]] += C
                if writeback: # we cannot directly write back without messing
                              # things up. instead we buffer one row at a time
                    Sbuf[Ibuf] = S[K[1]]
                    Ibuf += 1
                I[K[1]] += 1
            # a second queue has been exhausted. which one?
            if K[1] == Kref: # the reference queue: must update results in
                             # the remainder of the other queue
                if filter:
                    KK = SB[I[K[0]]:SH[K[0]]]
                    RB[KK[(KK >= FL[K[0]]) & (KK < FH[K[0]])]] += C
                else:
                    RB[SB[I[K[0]]:SH[K[0]]]] += C
            if writeback: # write back updated order
                # the leftovers of the last remaining queue have not been
                # buffered but being contiguous can be written back directly
                # the way this is used by the main script actually gives a
                # fifty-fifty chance of copying something exactly onto itself
                SB[SLbuf+Ibuf:SHbuf] = SB[I[K[0]]:SH[K[0]]]
                # now copy the buffer
                SB[SLbuf:SLbuf+Ibuf] = Sbuf[:Ibuf]
                SLbuf += N; SHbuf += N
                Ibuf = 0
        SL += N; SH += N
        if filter:
            FL += N; FH += N
        # this is ugly:
        # going to the next row we must check whether one or more queues
        # have fully or partially hit the ceiling ST.
        # if two and fully we are done
        # if one fully we must alter the queue indices to make sure the
        # empty queue is at index 2, because of the requirement of having
        # at least one valid element in queues 0 and 1
        done = -1
        for II in range(3):
            if SH[II] == SL[II]:
                if done >= 0:
                    done = -2
                    break
                done = II
            elif SH[II] > ST:
                if SL[II] >= ST or (filter and FL[II] >= ST):
                    if done >= 0:
                        done = -2
                        break
                    done = II
                    if writeback:
                        SHbuf -= SH[II] - SL[II]
                    SH[II] = SL[II] = 0
                else:
                    if writeback:
                        SHbuf -= SH[II] - ST
                    SH[II] = ST
                if filter and FH[II] > ST:
                    FH[II] = ST
        if done == Kref or done == -2:
            break
        elif done == 0:
            SL[:2], SH[:2] = SL[1:], SH[1:]
            if filter:
                FL[:2], FH[:2] = FL[1:], FH[1:]
            SH[2] = SL[2]
            Kref -= 1
        elif done == 1:
            SL[1], SH[1] = SL[2], SH[2]
            if filter:
                FL[1], FH[1] = FL[2], FH[2]
            SH[2] = SL[2]
            Kref >>= 1

И особый случай _mergesorted2.py - pythran _mergesorted2.py

import numpy as np

#pythran export _mergesorted_1(float[:], int[:], int[:], int, bool)
#pythran export _mergesorted_1(int[:], int[:], int[:], int, bool)

def _mergesorted_1(DB, RB, SB, n, countequal):
    N = len(DB)
    K = ((N-n-1)>>1)<<1
    for i in range(0, K, 2):
        if DB[i] < DB[i+1] or (countequal and DB[i] == DB[i+1]):
            SB[i] = i
            SB[i+1] = i+1
            RB[i+1] += 1
        else:
            SB[i] = i+1
            SB[i+1] = i
        if DB[i+1] < DB[i+1+n] or (countequal and DB[i+1] == DB[i+1+n]):
            RB[i+1+n] += 1
    for i in range(K, (N>>1)<<1, 2):
        if DB[i] < DB[i+1] or (countequal and DB[i] == DB[i+1]):
            SB[i] = i
            SB[i+1] = i+1
            RB[i+1] += 1
        else:
            SB[i] = i+1
            SB[i+1] = i
    if N & 1:
        SB[N-1] = N-1
0 голосов
/ 11 декабря 2018

Повторное размещение содержания моего комментария по запросу @ Basj:

Мысль

Предположим, что для размера окна k вы используете окно A[i-k: i]не для элемента A[i], а для одного из его соседей A[i+1] (или A[i-1]).

Содержимое этого окна A[i-k:i] практически идентично содержимому "истинного окна для * 1015".* ", A[i-k+1: i+1];k-1 их элементов одинаковы, только с 1 (потенциально) несовпадающим элементом.Это повлияет на количество меньших для A[i+1] не более чем на 1;либо измененный элемент считается, когда реальный не будет, или наоборот.Таким образом, самое большее, меньший счет для A[i+1] будет отклоняться от «истинного счета для A[i+1]» не более чем на 1 .

По той же логике, делая то же самое дляA[i+2] (или A[i-2]) даст вам максимальное отклонение 2, а в целом, то же самое для A[i+j] даст вам максимальное отклонение abs(j).

Так что если ваша цельточность равна 1e-3, что означает, что допустимая ошибка равна половине этого значения, 5e-4, тогда вместо этого вы можете приблизить результаты для всего набора значений A[i+j] for j in range(int(-k * 5e-4), int(k * 5e-4)), просто повторно используя одно и то же окно A[i-k: i] для каждого A[i+j].

... И что теперь?

Вы можете просто настроить свой код для подсчета меньших чисел в этом скорректированном окне для каждого A[i+j] и увеличить i на k*1e-3 кусков.

... но это не спасет вас ни разу.Вы по-прежнему берете кусок из k чисел и считаете количество значений меньше некоторого справочного значения a, и делаете это для 5 миллионов a.Это именно то, что вы делали раньше.

Так что вопрос: как вы можете злоупотреблять повторением, чтобы сэкономить время?

@ Basj Я оставлюОстальная часть этой мысли вам.В конце концов, это финальный сезон;]

0 голосов
/ 10 декабря 2018

Извините, что не реализовал мою идею для вас;У меня сейчас нет времени.Но я надеюсь, что это поможет!

Обозначения

Я буду использовать n в качестве размера массива и k в качестве размера окна.

Концепция

Для каждого элемента A[i] построить дерево сплайнов , упорядочив все элементы a in A[max(0, i-k): i+1], а затем использовать дерево сплайсов для подсчета количества элементов a < A[i].Преимущество здесь состоит в том, что деревья сопряжения для соседних элементов A[i] & A[i+1] будут отличаться только вставкой одного узла и (для i > k) удалением одного узла, что сокращает время, необходимое для построения деревьев сопряжения.

Обязательные операции имеют следующие сложности:

  • для каждого i: O(n * ?)
    • добавление A[i] в качестве узла к дереву отображения: амортизированное O(log k)
    • , считая a < A[i]: поскольку добавление A[i] помещает его в корневую позицию, вам нужно только проверить счетчик размера левой ветви -> O(1)
    • , удалив узел A[i-k-1]:амортизируется O(log k)

Общая сложность: амортизируется O(n log(k))

0 голосов
/ 08 декабря 2018

Вот «точный» подход.Он решает проблему размером 5,000,000 / 1,000,000 (с поплавками) менее чем за 20 секунд на довольно пешеходном оборудовании.

Я прошу прощения за довольно технический код.Я не уверен, что это можно сделать намного более читабельным.

Основная идея состоит в том, чтобы разбить массив на бинарную древовидную вещь (извините, формальное обучение по scicomp отсутствует).

Например, если у нас есть порции размером в полмиллиона, мы можем отсортировать каждый из них по стоимости linlog, а затем найти вклад любого блока в каждый элемент следующего блока по амортизированной постоянной стоимости.

Сложность состоит в том, как собрать куски разных размеров таким образом, чтобы в итоге мы посчитали все и ровно один раз.

Мой подход состоит в том, чтобы начать с небольших блоков, а затем продолжать объединять пары этих,В принципе, это должно держать стоимость сортировки линейной на каждой итерации, потому что в теории (но не в виде кучи) мы могли бы полностью использовать сортировку меньших кусков.

Как уже упоминалось выше, код сложен в основном потому, что нам нужносравнить правильные элементы с любым конкретным блоком.В основном это сводится к двум правилам: 1) Блок должен полностью содержаться в просмотре элемента.2) блок не должен содержаться в блоке большего размера, который полностью содержится в просмотре элемента.

В любом случае, здесь приведен пример прогона

size 5_000_000, lookback 1_000_000 -- took 14.593 seconds
seems correct -- 10_000 samples checked

и код:

ОБНОВЛЕНИЕ: немного упростил код, также работает быстрее

ОБНОВЛЕНИЕ 2: добавлена ​​версия с "<=" вместо "<" </p>

"<": </strong>

import numpy as np
from numpy.lib.stride_tricks import as_strided

def add_along_axis(a, indices, values, axis):
    if axis<0:
        axis += a.ndim
    I = np.ogrid[(*map(slice, a.shape),)]
    I = *I[:axis], indices, *I[axis+1:]
    a[I] += values

aaa, taa, paa = add_along_axis, np.take_along_axis, np.put_along_axis
m2f, f2m = np.ravel_multi_index, np.unravel_index

def inv_perm(p):
    i = np.empty_like(p)
    paa(i, p, np.arange(p.shape[-1]), -1)
    return i

def rolling_count_smaller(data, n):
    N = len(data)
    b = n.bit_length()
    NN = (((N-1)>>b)+2)<<b
    d0 = np.empty(NN, data.dtype)
    d0[NN-N:] = data[::-1]
    d0[:NN-N] = data.max() + 1
    dt, it, r0 = d0.copy(), np.zeros(NN, int), np.zeros(NN, int)
    ch, ch2 = 1, 2
    for i in range(b-1):
        d0.shape = dt.shape = it.shape = r0.shape = -1, 2, ch
        sh = dt.shape
        (il, ir), (jl, jr), (k, _) = f2m(m2f(np.add(sh, (-1, -2, -1)), sh) - (n, n-ch), sh)
        I = min(il, ir) + 1
        bab = np.empty((I, ch2), dt.dtype)
        bab[:, ch:] = dt[sh[0]-I:, 0]
        IL, IR = np.s_[il-I+1:il+1, ir-I+1:ir+1]
        bab[:, k:ch] = d0[IL, jl, k:]
        bab[:, :k] = d0[IR, jr, :k]
        o = bab.argsort(1, kind='stable')
        ns, io = (o>=ch).cumsum(1), inv_perm(o)
        r0[IL, jl, k:] += taa(ns, io[:, k:ch], 1)
        r0[IR, jr, :k] += taa(ns, io[:, :k], 1)

        it[:, 1, :] += ch
        dt.shape = it.shape = r0.shape = -1, ch2
        o = dt.argsort(1, kind='stable')
        ns, io = (o>=ch).cumsum(1), inv_perm(o)
        aaa(r0, it[:, :ch], taa(ns, io[:, :ch], 1), 1)
        dt, it = taa(dt, o, 1), taa(it, o, 1)
        ch, ch2 = ch2, ch2<<1
    si, sj = dt.shape
    o = as_strided(dt, (si-1, sj<<1), dt.strides).argsort(1, kind='stable')
    ns, io = (o>=ch).cumsum(1), inv_perm(o)
    r0[:-1, ch2-n-1:] += taa(ns, taa(io, inv_perm(it)[:-1, ch2-n-1:], 1), 1)
    return r0.ravel()[:NN-N-1:-1]

l = 1000
data = np.random.randint(-99, 100, (5*l,))
from time import perf_counter as pc
t = pc()
x = rolling_count_smaller(data, l)
t = pc() - t
print(f'size {data.size:_d}, lookback {l:_d} -- took {t:.3f} seconds')
check = 1000
sample = np.random.randint(0, len(x), check)
y = np.array([np.count_nonzero(data[max(0, i-l):i]<data[i]) for i in sample])
assert np.all(y==x[sample])
print(f'seems correct -- {check:_d} samples checked')    

"<=": </strong>

import numpy as np
from numpy.lib.stride_tricks import as_strided

def add_along_axis(a, indices, values, axis):
    if axis<0:
        axis += a.ndim
    I = np.ogrid[(*map(slice, a.shape),)]
    I = *I[:axis], indices, *I[axis+1:]
    a[I] += values

aaa, taa, paa = add_along_axis, np.take_along_axis, np.put_along_axis
m2f, f2m = np.ravel_multi_index, np.unravel_index

def inv_perm(p):
    i = np.empty_like(p)
    paa(i, p, np.arange(p.shape[-1]), -1)
    return i

def rolling_count_smaller(data, n):
    N = len(data)
    b = n.bit_length()
    NN = (((N-1)>>b)+2)<<b
    d0 = np.empty(NN, data.dtype)
    d0[:N] = data
    d0[N:] = data.max() + 1
    dt, it, r0 = d0.copy(), np.zeros(NN, int), np.zeros(NN, int)
    ch, ch2 = 1, 2
    for i in range(b-1):
        d0.shape = dt.shape = it.shape = r0.shape = -1, 2, ch
        sh = dt.shape
        (il, ir), (jl, jr), (k, _) = f2m(m2f((0, 1, 0), sh) + (n-ch+1, n+1), sh)
        I = sh[0] - max(il, ir)
        bab = np.empty((I, ch2), dt.dtype)
        bab[:, :ch] = dt[:I, 1]
        IL, IR = np.s_[il:il+I, ir:ir+I]
        bab[:, ch+k:] = d0[IL, jl, k:]
        bab[:, ch:ch+k] = d0[IR, jr, :k]
        o = bab.argsort(1, kind='stable')
        ns, io = (o<ch).cumsum(1), inv_perm(o)
        r0[IL, jl, k:] += taa(ns, io[:, ch+k:], 1)
        r0[IR, jr, :k] += taa(ns, io[:, ch:ch+k], 1)
        it[:, 1, :] += ch
        dt.shape = it.shape = r0.shape = -1, ch2
        o = dt.argsort(1, kind='stable')
        ns, io = (o<ch).cumsum(1), inv_perm(o)
        aaa(r0, it[:, ch:], taa(ns, io[:, ch:], 1), 1)
        dt, it = taa(dt, o, 1), taa(it, o, 1)
        ch, ch2 = ch2, ch2<<1
    si, sj = dt.shape
    o = as_strided(dt, (si-1, sj<<1), dt.strides).argsort(1, kind='stable')
    ns, io = (o<ch).cumsum(1), inv_perm(o)
    r0[1:, :n+1-ch] += taa(ns, taa(io, ch+inv_perm(it)[1:, :n+1-ch], 1), 1)
    return r0.ravel()[:N]

l = 1000
data = np.random.randint(-99, 100, (5*l,))
from time import perf_counter as pc
t = pc()
x = rolling_count_smaller(data, l)
t = pc() - t
print(f'size {data.size:_d}, lookback {l:_d} -- took {t:.3f} seconds')
check = 1000
sample = np.random.randint(0, len(x), check)
y = np.array([np.count_nonzero(data[max(0, i-l):i]<=data[i]) for i in sample])
assert np.all(y==x[sample])
print(f'seems correct -- {check:_d} samples checked')    
0 голосов
/ 07 декабря 2018

Первая попытка ответа, исходя из предположения (из комментариев)

мы могли бы также использовать 16-битные целые числа, предварительно умножив A на 32768 и округлив.Точности было бы достаточно с int16

Предполагая, что мы работаем с числами int16: я бы попытался сохранить относительно небольшой массив размером 2**16, считая, сколько раз каждое число появлялось за последние 1 м.окно.Поддержание массива - O(1), так как с каждым приращением индекса вы просто уменьшаете 1 счетчик числа, которое окно просто «влево», и увеличиваете «новый» номер.Затем подсчет того, сколько чисел в окне меньше текущего числа, сводится к суммированию массива по всем индексам вплоть до (исключая) текущего числа.

Предполагая, что A[i] находится в диапазоне [-32768, 32768]:

B = np.zeros(2 * 32768 + 1)
Q = np.zeros(len(A))
n = 1000 * 1000

def adjust_index(i):
    return int(i) + 32768

for i in range(len(Q)):
    if i >= n + 1:
        B[adjust_index(A[i - n - 1])] -= 1
    if i > 0:
        B[adjust_index(A[i - 1])] += 1
    Q[i] = B[:adjust_index(A[i])].sum() / float(n)

Это запустилось на моей машине примерно за одну минуту.

Вы можете обменять пространство и некоторую скорость на точность, используя больший (или меньший) диапазон целых чисел (например, умножение на 2**17 вместо 2**16 для получения более точных данных за счет некоторой скорости; умножение на 2**15 для получения результатов быстрее, но менее точно).

...