Вот «точный» подход.Он решает проблему размером 5,000,000 / 1,000,000
(с поплавками) менее чем за 20 секунд на довольно пешеходном оборудовании.
Я прошу прощения за довольно технический код.Я не уверен, что это можно сделать намного более читабельным.
Основная идея состоит в том, чтобы разбить массив на бинарную древовидную вещь (извините, формальное обучение по scicomp отсутствует).
Например, если у нас есть порции размером в полмиллиона, мы можем отсортировать каждый из них по стоимости linlog, а затем найти вклад любого блока в каждый элемент следующего блока по амортизированной постоянной стоимости.
Сложность состоит в том, как собрать куски разных размеров таким образом, чтобы в итоге мы посчитали все и ровно один раз.
Мой подход состоит в том, чтобы начать с небольших блоков, а затем продолжать объединять пары этих,В принципе, это должно держать стоимость сортировки линейной на каждой итерации, потому что в теории (но не в виде кучи) мы могли бы полностью использовать сортировку меньших кусков.
Как уже упоминалось выше, код сложен в основном потому, что нам нужносравнить правильные элементы с любым конкретным блоком.В основном это сводится к двум правилам: 1) Блок должен полностью содержаться в просмотре элемента.2) блок не должен содержаться в блоке большего размера, который полностью содержится в просмотре элемента.
В любом случае, здесь приведен пример прогона
size 5_000_000, lookback 1_000_000 -- took 14.593 seconds
seems correct -- 10_000 samples checked
и код:
ОБНОВЛЕНИЕ: немного упростил код, также работает быстрее
ОБНОВЛЕНИЕ 2: добавлена версия с "<=" вместо "<" </p>
"<": </strong>
import numpy as np
from numpy.lib.stride_tricks import as_strided
def add_along_axis(a, indices, values, axis):
if axis<0:
axis += a.ndim
I = np.ogrid[(*map(slice, a.shape),)]
I = *I[:axis], indices, *I[axis+1:]
a[I] += values
aaa, taa, paa = add_along_axis, np.take_along_axis, np.put_along_axis
m2f, f2m = np.ravel_multi_index, np.unravel_index
def inv_perm(p):
i = np.empty_like(p)
paa(i, p, np.arange(p.shape[-1]), -1)
return i
def rolling_count_smaller(data, n):
N = len(data)
b = n.bit_length()
NN = (((N-1)>>b)+2)<<b
d0 = np.empty(NN, data.dtype)
d0[NN-N:] = data[::-1]
d0[:NN-N] = data.max() + 1
dt, it, r0 = d0.copy(), np.zeros(NN, int), np.zeros(NN, int)
ch, ch2 = 1, 2
for i in range(b-1):
d0.shape = dt.shape = it.shape = r0.shape = -1, 2, ch
sh = dt.shape
(il, ir), (jl, jr), (k, _) = f2m(m2f(np.add(sh, (-1, -2, -1)), sh) - (n, n-ch), sh)
I = min(il, ir) + 1
bab = np.empty((I, ch2), dt.dtype)
bab[:, ch:] = dt[sh[0]-I:, 0]
IL, IR = np.s_[il-I+1:il+1, ir-I+1:ir+1]
bab[:, k:ch] = d0[IL, jl, k:]
bab[:, :k] = d0[IR, jr, :k]
o = bab.argsort(1, kind='stable')
ns, io = (o>=ch).cumsum(1), inv_perm(o)
r0[IL, jl, k:] += taa(ns, io[:, k:ch], 1)
r0[IR, jr, :k] += taa(ns, io[:, :k], 1)
it[:, 1, :] += ch
dt.shape = it.shape = r0.shape = -1, ch2
o = dt.argsort(1, kind='stable')
ns, io = (o>=ch).cumsum(1), inv_perm(o)
aaa(r0, it[:, :ch], taa(ns, io[:, :ch], 1), 1)
dt, it = taa(dt, o, 1), taa(it, o, 1)
ch, ch2 = ch2, ch2<<1
si, sj = dt.shape
o = as_strided(dt, (si-1, sj<<1), dt.strides).argsort(1, kind='stable')
ns, io = (o>=ch).cumsum(1), inv_perm(o)
r0[:-1, ch2-n-1:] += taa(ns, taa(io, inv_perm(it)[:-1, ch2-n-1:], 1), 1)
return r0.ravel()[:NN-N-1:-1]
l = 1000
data = np.random.randint(-99, 100, (5*l,))
from time import perf_counter as pc
t = pc()
x = rolling_count_smaller(data, l)
t = pc() - t
print(f'size {data.size:_d}, lookback {l:_d} -- took {t:.3f} seconds')
check = 1000
sample = np.random.randint(0, len(x), check)
y = np.array([np.count_nonzero(data[max(0, i-l):i]<data[i]) for i in sample])
assert np.all(y==x[sample])
print(f'seems correct -- {check:_d} samples checked')
"<=": </strong>
import numpy as np
from numpy.lib.stride_tricks import as_strided
def add_along_axis(a, indices, values, axis):
if axis<0:
axis += a.ndim
I = np.ogrid[(*map(slice, a.shape),)]
I = *I[:axis], indices, *I[axis+1:]
a[I] += values
aaa, taa, paa = add_along_axis, np.take_along_axis, np.put_along_axis
m2f, f2m = np.ravel_multi_index, np.unravel_index
def inv_perm(p):
i = np.empty_like(p)
paa(i, p, np.arange(p.shape[-1]), -1)
return i
def rolling_count_smaller(data, n):
N = len(data)
b = n.bit_length()
NN = (((N-1)>>b)+2)<<b
d0 = np.empty(NN, data.dtype)
d0[:N] = data
d0[N:] = data.max() + 1
dt, it, r0 = d0.copy(), np.zeros(NN, int), np.zeros(NN, int)
ch, ch2 = 1, 2
for i in range(b-1):
d0.shape = dt.shape = it.shape = r0.shape = -1, 2, ch
sh = dt.shape
(il, ir), (jl, jr), (k, _) = f2m(m2f((0, 1, 0), sh) + (n-ch+1, n+1), sh)
I = sh[0] - max(il, ir)
bab = np.empty((I, ch2), dt.dtype)
bab[:, :ch] = dt[:I, 1]
IL, IR = np.s_[il:il+I, ir:ir+I]
bab[:, ch+k:] = d0[IL, jl, k:]
bab[:, ch:ch+k] = d0[IR, jr, :k]
o = bab.argsort(1, kind='stable')
ns, io = (o<ch).cumsum(1), inv_perm(o)
r0[IL, jl, k:] += taa(ns, io[:, ch+k:], 1)
r0[IR, jr, :k] += taa(ns, io[:, ch:ch+k], 1)
it[:, 1, :] += ch
dt.shape = it.shape = r0.shape = -1, ch2
o = dt.argsort(1, kind='stable')
ns, io = (o<ch).cumsum(1), inv_perm(o)
aaa(r0, it[:, ch:], taa(ns, io[:, ch:], 1), 1)
dt, it = taa(dt, o, 1), taa(it, o, 1)
ch, ch2 = ch2, ch2<<1
si, sj = dt.shape
o = as_strided(dt, (si-1, sj<<1), dt.strides).argsort(1, kind='stable')
ns, io = (o<ch).cumsum(1), inv_perm(o)
r0[1:, :n+1-ch] += taa(ns, taa(io, ch+inv_perm(it)[1:, :n+1-ch], 1), 1)
return r0.ravel()[:N]
l = 1000
data = np.random.randint(-99, 100, (5*l,))
from time import perf_counter as pc
t = pc()
x = rolling_count_smaller(data, l)
t = pc() - t
print(f'size {data.size:_d}, lookback {l:_d} -- took {t:.3f} seconds')
check = 1000
sample = np.random.randint(0, len(x), check)
y = np.array([np.count_nonzero(data[max(0, i-l):i]<=data[i]) for i in sample])
assert np.all(y==x[sample])
print(f'seems correct -- {check:_d} samples checked')