Ниже приведен график ваших данных:

Традиционный способ сделать то, о чем вы говорите, - это интерполировать данные.Это означает добавление кривой наилучшего соответствия, такой как кубический сплайн .

После нахожденияПо дифференцируемой кривой, проходящей через точки, вы найдете первую производную (скорость изменения).Где бы у первого производного не было локального максимума, ваши данные внезапно пошли вверх или вниз.
Однако, все это может быть излишним для ваших целей.Я думаю, что следующий код будет делать то, что вы хотите:
class OnOffRecorder:
"""
"""
def __init__(self, is_active = False):
"""
By default, is not recording data
"""
self.is_active = is_active
self.data = dict()
def turn_recording_on(self):
self.is_active = True
def turn_recording_off(self):
self.is_active = False
def toggle_recording(self):
self.is_active = not(self.is_active)
def push(self, key, value):
if self.is_active:
self.data[key] = value
return
def pop(self):
old_data = self.data
self.data = dict()
return old_data
def get_peak_data(data, delta):
"""
`delta` represents percentage distance between
minimum and maximum of the data.
if data suddenly increases by delta, then begin recording
if data suddenly decreases by delta, then stop recording
"""
mind = min(data)
maxd = max(data)
raynge = maxd - mind
# `li` == `left index`
# `ri` == `right index`
record = OnOffRecorder()
record.turn_recording_off()
for li in range(0, -1 + len(data)):
ri = li + 1
ld = data[li] # left data
rd = data[ri] # right data
if abs(rd-ld)/raynge > delta:
if rd > ld :
record.turn_recording_on()
elif rd < ld:
record.turn_recording_off()
record.push(ri, rd)
return record.pop()
data = [22 , 15 , 6 ,12 ,30 , 45, 200 , 238 ,
220 , 6000, 6250 , 6900, 6700, 6500,
0 , 250 , 6000 ,6800,220, 250, 200]
# if data suddenly increases, then begin recording that data
# if data suddenly decreases, then stop recording that data.
delta = .25
peak_data = get_peak_data(data, delta)
print(peak_data.values())
EDIT:
Я разработал второй / другой подход, который не требует, чтобы вы вручную указали значение delta
.Это описано ниже:
- Сортировка данных.
- ход отсортированных данных от минимального до максимального
- запись различий / скачков.
- поискмаксимум скачков.
- Этот максимальный скачок - это когда переход низких сигналов к высоким сигналам
Ниже приведен код, который реализует пошаговый процесс, описанный выше:
def get_dividing_line(data):
sdata = sorted(data)
jumps = [sdata[i+1] - sdata[i] for i in range(0, -1 + len(sdata))]
jumps_max = max(jumps)
jumps_max_left = jumps.index(jumps_max)
jumps_max_right = [x for x in reversed(jumps)].index(jumps_max)
jumps_max_right = len(sdata) - jumps_max_right
jump_start = sdata[jumps_max_left]
jump_end = sdata[jumps_max_right]
return (jump_start + jump_end)/2
def get_high_signals(data):
threshold = get_dividing_line(data)
return [x for x in data if x >= threshold]
data = [22 , 15 , 6 ,12 ,30 , 45, 200 , 238 ,
220 , 6000, 6250 , 6900, 6700, 6500,
0 , 250 , 6000 ,6800,220, 250, 200]
high_signals = get_high_signals(data)
print(high_signals)
# prints [6000, 6250, 6900, 6700, 6500, 6000, 6800]