Поскольку исходные данные недоступны, я создал несколько тестовых данных. Подход заключается в следующем. Если данные представляют собой набор плато со скачками, проверка стандартного отклонения в движущемся окне даст пики при скачках. С порога изолировать пики. Получите оценку скачка, посмотрев данные после применения скользящего среднего. Это обеспечивает хорошую отправную точку для примерки. В качестве функции подгонки я снова использую мой любимый tanh
.
Результат выглядит следующим образом:
import matplotlib.pyplot as plt
import numpy as np
from operator import itemgetter
from itertools import groupby
from scipy.optimize import leastsq
def moving_average( data, windowsize, **kwargs ):
mode=kwargs.pop('mode', 'valid')
weight = np.ones( windowsize )/ float( windowsize )
return np.convolve( data, weight, mode=mode)
def moving_sigma( data, windowsize ):
l = len( data )
sout = list()
for i in range( l - windowsize + 1 ):
sout += [ np.std( data[ i : i + windowsize ] ) ]
return np.array( sout )
def m_step( x, s, x0List, ampList, off ):
assert len( x0List ) == len( ampList )
out = [ a * 0.5 * ( 1 + np.tanh( s * (x - x0) ) ) for a, x0 in zip( ampList, x0List )]
return sum( out ) + off
def residuals( params, xdata, ydata ):
assert not len(params) % 2
off = params[0]
s = params[1]
rest = params[2:]
n = len( rest )
x0 = rest[:n/2]
am = rest[n/2:]
diff = [ y - m_step( x,s, x0, am, off ) for x, y in zip( xdata, ydata ) ]
return diff
### generate data
np.random.seed(776)
a=0
data = list()
for i in range(15):
a = 50 * ( 1 - 2 * np.random.random() )
for _ in range ( np.random.randint( 5, 35 ) ):
data += [a]
xx = np.arange( len( data ), dtype=np.float )
noisydata = list()
for x in data:
noisydata += [ x + np.random.normal( scale=5 ) ]
###define problem specific parameters
myWindow = 10
thresh = 8
cutoffscale = 1.1
### data treatment
avdata = moving_average( noisydata, myWindow )
avx = moving_average( xx, myWindow )
sdata = moving_sigma( noisydata, myWindow )
### getting start values for the fit
seq = [x for x in np.where( sdata > thresh )[0] ]
# from https://stackoverflow.com/a/3149493/803359
jumps = [ map( itemgetter(1), g ) for k, g in groupby( enumerate( seq ), lambda ( i, x ) : i - x ) ]
xjumps = [ [ avx[ k ] for k in xList ] for xList in jumps ]
means = [ np.mean( x ) for x in xjumps ]
### the delta is too small as one only looks above thresh so lets increase it by guessing
deltas = [ ( avdata[ x[-1] ] - avdata[ x[0] ] ) * cutoffscale for x in jumps ]
guess = [ avdata[0], 2] + means + deltas
### fitting
sol, err = leastsq( residuals, guess, args=( xx, noisydata ), maxfev=10000 )
fittedoff = sol[0]
fitteds = sol[1]
fittedx0 = sol[ 2 : 2 + len( means ) ]
fittedam = sol[ 2 + len( means ) : ]
### plotting
fig = plt.figure()
ax = fig.add_subplot( 1, 1, 1 )
# ~ ax.plot( data )
ax.plot( xx, noisydata, label='noisy data' )
ax.plot( avx, avdata, label='moving average' )
ax.plot( avx, sdata, label='window sigma' )
for j in means:
ax.axvline( j, c='#606060', linestyle=':' )
yy = [ m_step( x, 2, means, deltas, avdata[0] ) for x in xx]
bestyy = [ m_step( x, fitteds, fittedx0, fittedam, fittedoff ) for x in xx ]
ax.plot( xx, yy, label='guess' )
ax.plot( xx, bestyy, label='fit' )
plt.legend( loc=0 )
plt.show()
Что дает следующий график:
Немного меньшепрыжки не обнаружены, но должно быть ясно, что это ожидается. Более того, весь процесс не очень быстрый. Из-за порога первоначальное предположение ухудшается до большего x
, чем больше скачков. Наконец, последовательность небольших скачков не обнаруживается, так что результирующий наклон в данных определяется как одно большое плато с очевидной большой ошибкой. Можно было бы ввести дополнительную проверку для этого и добавить положения для переходов, чтобы соответствовать соответственно.
Также обратите внимание, что выбор размера окна определяет, на каком расстоянии два прыжка будут обнаружены как отдельные.