Найти похожие подпоследовательности во временном ряду? - PullRequest
1 голос
/ 15 октября 2019

У меня есть тысячи временных рядов (24 измерения данных - 1 измерение на каждый час дня). Из этих временных рядов меня интересует конкретная подпоследовательность или шаблон, который выглядит следующим образом:

image

Меня интересуют подпоследовательности, которые похожи на общийформа выделенного участка - то есть подпоследовательность с резким отрицательным уклоном, за которой следует период в несколько часов, когда уклон является относительно плоским, прежде чем окончательно завершиться с резким положительным уклоном. Я знаю, что интересующие меня подпоследовательности не будут точно соответствовать друг другу и, скорее всего, будут смещены во времени, масштабированы по-разному, будут иметь более длинные / короткие периоды, когда наклон относительно ровный и т. Д., Но я хотел бы найтиспособ обнаружить их всех.

Для этого я разработал простую эвристику (основанную на моем определении выделенного раздела), чтобы быстро найти некоторые интересующие подпоследовательности. Однако мне было интересно, есть ли более элегантный способ (в Python) искать тысячи временных рядов для интересующей меня подпоследовательности (с учетом вышеупомянутых вещей - различий во времени, масштабе и т. Д.). )

1 Ответ

0 голосов
/ 15 октября 2019

Функции ниже должны делать;код написан с интуитивно понятными именами переменных и методов и должен быть понятен для некоторых операций чтения. Код эффективен и масштабируем.


Функциональные возможности :

  • Укажите минимальную и максимальную длину плоской линии
  • Укажите минимальное и максимальное уклоныдля левого и правого хвостов
  • Укажите минимальное и максимальное значения среднее наклоны для левого и правого хвостов в течение нескольких интервалов

Пример :

import numpy as np
import matplotlib.pyplot as plt

# Toy data
t = np.array([[ 5,  3,  3,  5,  3,  3,  3,  3,  3,  5,  5,  3,  3,  0,  4,  
                1,  1, -1, -1,  1,  1,  1,  1, -1,  1,  1, -1,  0,  3,  3,  
                5,  5,  3,  3,  3,  3,  3,  5,  7,  3,  3,  5]]).T
plt.plot(t)
plt.show()

# Get flatline indices
indices = get_flatline_indices(t, min_len=4, max_len=5)
plt.plot(t)
for idx in indices:
    plt.plot(idx, t[idx], marker='o', color='r')
plt.show()

# Filter by edge slopes
lims_left  = (-10, -2)
lims_right = (2,  10)
averaging_intervals = [1, 2, 3]
indices_filtered = filter_by_tail_slopes(indices, t, lims_left, lims_right,
                                         averaging_intervals)
plt.plot(t)
for idx in indices_filtered:
    plt.plot(idx, t[idx], marker='o', color='r')
plt.show()

image

image

image


def get_flatline_indices(sequence, min_len=2, max_len=6):
    indices=[]
    elem_idx = 0
    max_elem_idx = len(sequence) - min_len

    while elem_idx < max_elem_idx:
        current_elem = sequence[elem_idx]
        next_elem    = sequence[elem_idx+1]
        flatline_len = 0

        if current_elem == next_elem:
            while current_elem == next_elem:
                flatline_len += 1
                next_elem = sequence[elem_idx + flatline_len]

            if flatline_len >= min_len:
                if flatline_len > max_len:
                    flatline_len = max_len

                trim_start = elem_idx
                trim_end   = trim_start + flatline_len
                indices_to_append = [index for index in range(trim_start, trim_end)]
                indices += indices_to_append

            elem_idx += flatline_len
            flatline_len = 0
        else:
            elem_idx += 1
    return indices if not all([(entry == []) for entry in indices]) else []
def filter_by_tail_slopes(indices, data, lims_left, lims_right, averaging_intervals=1):
    indices_filtered = []
    indices_temp, tails_temp = [], []
    got_left, got_right = False, False

    for idx in indices:
        slopes_left, slopes_right = _get_slopes(data, idx, averaging_intervals)

        for tail_left, slope_left in enumerate(slopes_left):
            if _valid_slope(slope_left, lims_left):
                if got_left:
                    indices_temp = []  # discard prev if twice in a row
                    tails_temp = []
                indices_temp.append(idx)
                tails_temp.append(tail_left + 1)
                got_left = True
        if got_left:
            for edge_right, slope_right in enumerate(slopes_right):
                if _valid_slope(slope_right, lims_right):
                    if got_right:
                        indices_temp.pop(-1)
                        tails_temp.pop(-1)
                    indices_temp.append(idx)
                    tails_temp.append(edge_right + 1)
                    got_right = True

        if got_left and got_right:
            left_append  = indices_temp[0] - tails_temp[0]
            right_append = indices_temp[1] + tails_temp[1]
            indices_filtered.append(_fill_range(left_append, right_append))
            indices_temp = []
            tails_temp = []
            got_left, got_right = False, False
    return indices_filtered
def _get_slopes(data, idx, averaging_intervals):
    if type(averaging_intervals) == int:
        averaging_intervals = [averaging_intervals]

    slopes_left, slopes_right = [], []
    for interval in averaging_intervals:
        slopes_left  += [(data[idx] - data[idx-interval]) / interval]
        slopes_right += [(data[idx+interval] - data[idx]) / interval]
    return slopes_left, slopes_right

def _valid_slope(slope, lims):
    min_slope, max_slope = lims
    return (slope  >= min_slope) and (slope <= max_slope)

def _fill_range(_min, _max):
    return [i for i in range(_min, _max + 1)]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...