Получение интервалов из данных временного ряда - PullRequest
4 голосов
/ 02 октября 2019

У меня довольно странный вопрос. В настоящее время я работаю с данными временных рядов и имею несколько пиков в моем наборе данных. Эти данные собираются с использованием машины регистрации нейтронной плотности, и она описывает событие, записанное датчиком непрерывно в течение некоторого времени. Пики в данных соответствуют некоторому интересному интервалу, когда эта машина спускается в ствол скважины. Итак, вершина важна. Однако важны не только пики. Это весь интервал (или, по крайней мере, как я его описываю; см. Прилагаемый рисунок). Теперь мой вопрос заключается в том, есть ли способ обработки сигнала (предпочтительно в Python), который я могу использовать или изучить, который может позволить мне преобразовать этот сигнал в разные интервалы, каждый из которых соответствует локальным минимумам / максимумам?

Мой первоначальный подход заключался в использовании алгоритмов обнаружения разрыва, как описано в Kleinberg 2002, но у меня ничего не вышло, поэтому я хочу узнать мнение других людей по этому поводу.

Это исходные данные:

graph of proportion of target events vs. time

Вот что я хочу сделать:

annotated graph

Ответы [ 2 ]

1 голос
/ 08 октября 2019

Поскольку исходные данные недоступны, я создал несколько тестовых данных. Подход заключается в следующем. Если данные представляют собой набор плато со скачками, проверка стандартного отклонения в движущемся окне даст пики при скачках. С порога изолировать пики. Получите оценку скачка, посмотрев данные после применения скользящего среднего. Это обеспечивает хорошую отправную точку для примерки. В качестве функции подгонки я снова использую мой любимый tanh.

Результат выглядит следующим образом:

import matplotlib.pyplot as plt
import numpy as np
from operator import itemgetter
from itertools import groupby
from scipy.optimize import leastsq

def moving_average( data, windowsize, **kwargs ):
    mode=kwargs.pop('mode', 'valid')
    weight = np.ones( windowsize )/ float( windowsize )
    return np.convolve( data, weight, mode=mode)

def moving_sigma( data, windowsize ):
    l = len( data )
    sout = list()
    for i in range( l - windowsize + 1 ):
        sout += [ np.std( data[ i : i + windowsize ] ) ]
    return np.array( sout )

def m_step( x, s, x0List, ampList, off ):
    assert len( x0List ) == len( ampList )
    out = [ a * 0.5 * ( 1 + np.tanh( s * (x - x0) ) ) for a, x0 in zip( ampList, x0List )]
    return sum( out ) + off

def residuals( params, xdata, ydata ):
    assert not len(params) % 2
    off = params[0]
    s = params[1]
    rest = params[2:]
    n = len( rest )
    x0 = rest[:n/2]
    am = rest[n/2:]
    diff = [ y - m_step( x,s, x0, am, off ) for x, y in zip( xdata, ydata ) ]
    return diff 

### generate data
np.random.seed(776)
a=0
data = list()
for i in range(15):
    a = 50 * ( 1 - 2 * np.random.random() )
    for _ in range ( np.random.randint( 5, 35 ) ):
        data += [a]

xx =  np.arange( len( data ), dtype=np.float )

noisydata = list()
for x in data:
    noisydata += [ x + np.random.normal( scale=5 ) ]

###define problem specific parameters
myWindow = 10
thresh = 8
cutoffscale = 1.1

### data treatment
avdata = moving_average( noisydata, myWindow )
avx = moving_average( xx, myWindow )
sdata = moving_sigma( noisydata, myWindow )

### getting start values for the fit
seq = [x for x in np.where( sdata > thresh )[0] ]
# from https://stackoverflow.com/a/3149493/803359
jumps = [ map( itemgetter(1), g ) for k, g in groupby( enumerate( seq ), lambda ( i, x ) : i - x ) ]
xjumps = [ [ avx[ k ] for k in xList ] for xList in jumps ]
means = [ np.mean( x ) for x in xjumps ]
### the delta is too small as one only looks above thresh so lets increase it by guessing
deltas = [ ( avdata[ x[-1] ] - avdata[ x[0] ] ) * cutoffscale for x in jumps ]

guess = [ avdata[0], 2] + means + deltas

### fitting
sol, err = leastsq( residuals, guess, args=( xx, noisydata ), maxfev=10000 )
fittedoff = sol[0]
fitteds = sol[1]
fittedx0 = sol[ 2 : 2 + len( means ) ] 
fittedam = sol[ 2 + len( means ) : ] 

### plotting
fig = plt.figure()
ax = fig.add_subplot( 1, 1, 1 )
# ~ ax.plot( data )
ax.plot( xx, noisydata, label='noisy data' )
ax.plot( avx, avdata, label='moving average' )
ax.plot( avx, sdata, label='window sigma' )

for j in means:
    ax.axvline( j, c='#606060', linestyle=':' )

yy = [ m_step( x, 2, means, deltas, avdata[0] ) for x in xx]
bestyy = [ m_step( x, fitteds, fittedx0, fittedam, fittedoff ) for x in xx ]
ax.plot( xx, yy, label='guess' )
ax.plot( xx, bestyy, label='fit' )

plt.legend( loc=0 )
plt.show()

Что дает следующий график:

enter image description here

Немного меньшепрыжки не обнаружены, но должно быть ясно, что это ожидается. Более того, весь процесс не очень быстрый. Из-за порога первоначальное предположение ухудшается до большего x, чем больше скачков. Наконец, последовательность небольших скачков не обнаруживается, так что результирующий наклон в данных определяется как одно большое плато с очевидной большой ошибкой. Можно было бы ввести дополнительную проверку для этого и добавить положения для переходов, чтобы соответствовать соответственно.

Также обратите внимание, что выбор размера окна определяет, на каком расстоянии два прыжка будут обнаружены как отдельные.

0 голосов
/ 04 октября 2019

Я извлек данные из диаграммы рассеяния для анализа, и здесь я впервые рассмотрел проблему с примером кода для построения. То, что это делает, это подгоняет полином третьего порядка ко всем данным в качестве базовой линии, а затем ищет точки данных на расстоянии среза выше или ниже этой базовой линии в части кода, помеченной как «прямоугольное сечение». Я надеюсь, что это может по крайней мере предложить путь к решению проблемы. Одним из улучшений, которое пришло на ум, было внедрение минимальной ширины в коде «прямоугольности».

plot

import numpy, matplotlib
import matplotlib.pyplot as plt


##########################################################
# data load section
f = open('/home/zunzun/neutron_refl_bore/rawdata.dat')
rawdata = f.read()
f.close()

xData = []
yData = []
for line in rawdata.split('\n'):
    if line:
        spl = line.split()
        xData.append(float(spl[0]))
        yData.append(float(spl[1]))
xData = numpy.array(xData)
yData = numpy.array(yData)


##########################################################
#rectangularize section
cutoffLimit = 1.6
polynomialOrder = 3 # for baseline curve
modelParameters = numpy.polyfit(xData, yData, polynomialOrder) 
modelPredictions = numpy.polyval(modelParameters, xData)
modelDifference = yData - modelPredictions
maxDiff = max(modelDifference)
minDiff = min(modelDifference)

aboveLimit = (modelDifference > cutoffLimit) * maxDiff
belowLimit = (modelDifference < -cutoffLimit) * minDiff
rectanglarized = modelPredictions +  + belowLimit + aboveLimit

##########################################################
# graphics output section
def ModelAndScatterPlot(graphWidth, graphHeight):
    f = plt.figure(figsize=(graphWidth/100.0, graphHeight/100.0), dpi=100)
    axes = f.add_subplot(111)

    # first the raw data as a scatter plot
    axes.plot(xData, yData,  'D')

    # create data for the fitted equation plot
    xModel = numpy.linspace(min(xData), max(xData))
    yModel = numpy.polyval(modelParameters, xModel)

    # now the model as a line plot
    axes.plot(xModel, yModel)

    # now show rectangularized
    axes.plot(xData, rectanglarized)

    axes.set_title('Data Outside Polynomial Baseline +/- Cutoff') # add a title
    axes.set_xlabel('X Data') # X axis data label
    axes.set_ylabel('Y Data') # Y axis data label

    plt.show()
    plt.close('all') # clean up after using pyplot

graphWidth = 800
graphHeight = 600
ModelAndScatterPlot(graphWidth, graphHeight)
...