Как выбрать окно значений из списка? - PullRequest
0 голосов
/ 30 мая 2019

У меня есть поток данных типа

list  = [22 , 15 , 6 ,12 ,30 , 45, 200 , 238 , 220 , 6000, 6250 , 6900, 6700,6500, 0 , 250 , 6000 ,6800,220, 250,200]

Как видите, поток данных имеет несколько очень высоких значений в середине, что мы и хотим извлечь. Этот блок также может содержать низкие значения (0 и 250 в приведенном выше списке), но определенно более высокие значения в начале и конце блока (6000, 65500). Как мы можем извлечь это конкретное окно данных.

Выход должен быть

new_list = [6000,6250,6900,600,6500,0, 250 , 6000 , 6800]

Я обычно работаю на MATLAB. Таким образом, я бы сделал это, найдя первый и последний пик в данных

Ответы [ 2 ]

0 голосов
/ 30 мая 2019

Вы можете установить порог между самым высоким и самым низким значениями в данных, а затем найти первый и последний индексы записей, которые находятся выше порога, чтобы сформировать диапазон в списке:

data     = [22 , 15 , 6 ,12 ,30 , 45, 200 , 238 , 220 , 6000, 6250 , 6900, 6700,6500, 0 , 250 , 6000 ,6800,220, 250,200]
treshold = (min(data)+max(data))/2
start    = next(i for i,v in enumerate(data) if v >= treshold)
end      = len(data) - next(i for i,v in enumerate(data[::-1]) if v >= treshold)
result   = data[start:end]

print(result)
# [6000, 6250, 6900, 6700, 6500, 0, 250, 6000, 6800]
0 голосов
/ 30 мая 2019

Ниже приведен график ваших данных:

plot of points without line

Традиционный способ сделать то, о чем вы говорите, - это интерполировать данные.Это означает добавление кривой наилучшего соответствия, такой как кубический сплайн .

plot with line

После нахожденияПо дифференцируемой кривой, проходящей через точки, вы найдете первую производную (скорость изменения).Где бы у первого производного не было локального максимума, ваши данные внезапно пошли вверх или вниз.

Однако, все это может быть излишним для ваших целей.Я думаю, что следующий код будет делать то, что вы хотите:

class OnOffRecorder:
    """
    """
    def __init__(self, is_active = False):
        """
        By default, is not recording data
        """
        self.is_active = is_active
        self.data = dict()

    def turn_recording_on(self):
        self.is_active = True

    def turn_recording_off(self):
        self.is_active = False

    def toggle_recording(self):
        self.is_active = not(self.is_active)

    def push(self, key, value):
        if self.is_active:
            self.data[key] = value
        return

    def pop(self):
        old_data = self.data
        self.data = dict()
        return old_data

def get_peak_data(data, delta):
    """
    `delta` represents percentage distance between
            minimum and maximum of the data.

            if data suddenly increases by delta, then begin recording
            if data suddenly decreases by delta, then stop recording
    """
    mind = min(data)
    maxd = max(data)
    raynge = maxd - mind
    # `li` == `left index`
    # `ri` == `right index`
    record = OnOffRecorder()
    record.turn_recording_off()
    for li in range(0, -1 + len(data)):
        ri = li + 1
        ld = data[li] # left data
        rd = data[ri] # right data
        if abs(rd-ld)/raynge > delta:
            if rd > ld :
                record.turn_recording_on()
            elif rd < ld:
                record.turn_recording_off()
        record.push(ri, rd)
    return record.pop()

data = [22 , 15 , 6 ,12 ,30 , 45, 200 , 238 ,
        220 , 6000, 6250 , 6900, 6700, 6500,
        0 , 250 , 6000 ,6800,220, 250, 200]

# if data suddenly increases, then begin recording that data
# if data suddenly decreases, then stop recording that data.
delta = .25
peak_data = get_peak_data(data, delta)
print(peak_data.values()) 

EDIT:

Я разработал второй / другой подход, который не требует, чтобы вы вручную указали значение delta.Это описано ниже:

  1. Сортировка данных.
  2. ход отсортированных данных от минимального до максимального
  3. запись различий / скачков.
  4. поискмаксимум скачков.
  5. Этот максимальный скачок - это когда переход низких сигналов к высоким сигналам

Ниже приведен код, который реализует пошаговый процесс, описанный выше:

def get_dividing_line(data):
    sdata = sorted(data)
    jumps = [sdata[i+1] - sdata[i] for i in range(0, -1 + len(sdata))]
    jumps_max = max(jumps)
    jumps_max_left = jumps.index(jumps_max)
    jumps_max_right = [x for x in reversed(jumps)].index(jumps_max)
    jumps_max_right = len(sdata) - jumps_max_right
    jump_start = sdata[jumps_max_left]
    jump_end = sdata[jumps_max_right]
    return (jump_start + jump_end)/2

def get_high_signals(data):
    threshold = get_dividing_line(data)
    return [x for x in data if x >= threshold]

data = [22 , 15 , 6 ,12 ,30 , 45, 200 , 238 ,
        220 , 6000, 6250 , 6900, 6700, 6500,
        0 , 250 , 6000 ,6800,220, 250, 200]

high_signals = get_high_signals(data)
print(high_signals)
# prints [6000, 6250, 6900, 6700, 6500, 6000, 6800]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...