Нахождение аномальных значений по синусоидальным данным - PullRequest
0 голосов
/ 22 декабря 2018

Как найти аномальные значения по следующим данным.Я моделирую синусоидальный рисунок.Хотя я могу наносить данные и выявлять любые аномалии или шумы в данных, но как я могу это сделать, не нанося данные на график?Я ищу простые подходы, кроме методов машинного обучения.

import random 
import numpy as np 
import matplotlib.pyplot as plt 

N = 10                  # Set signal sample length
t1 = -np.pi             # Simulation begins at t1
t2 =  np.pi;            # Simulation  ends  at t2

in_array = np.linspace(t1, t2, N)
print("in_array : ", in_array)
out_array = np.sin(in_array)

plt.plot(in_array, out_array, color = 'red', marker = "o") ; plt.title("numpy.sin()")

enter image description here

Ввод случайного шума

noise_input = random.uniform(-.5, .5); print("Noise : ",noise_input)

in_array[random.randint(0,len(in_array)-1)] = noise_input
print(in_array)

plt.plot(in_array, out_array, color = 'red', marker = "o") ; plt.title("numpy.sin()")

Данные с шумом

enter image description here

Ответы [ 2 ]

0 голосов
/ 23 декабря 2018

Я подумал о следующем подходе к вашей проблеме, поскольку у вас есть только некоторые значения, которые являются аномальными по вектору времени, это означает, что остальные значения имеют регулярную прогрессию, что означает, что если мы соберем всеуказывает точки в векторе под кластерами и вычисляет средний шаг для самого большого кластера (который, по сути, представляет собой пул значений, представляющих реальную сделку), затем мы можем использовать это среднее значение для обнаружения триады в заданном пороге надвектор и определить, какие из элементов являются аномальными.

Для этого нам понадобятся две функции: calculate_average_step, которая вычислит это среднее значение для наибольшего кластера значений закрытия, а затем нам потребуется detect_anomalous_values, который даст индексы аномальных значений в нашем векторе на основена это среднее значение, рассчитанное ранее.

После того, как мы обнаружили аномальные значения, мы можем пойти дальше и заменить их оценочным значением, которое мы можем определить из нашего среднего значения шага и используя соседние точки в векторе.

import random 
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt 


def calculate_average_step(array, threshold=5):
    """
    Determine the average step by doing a weighted average based on clustering of averages.
    array: our array
    threshold: the +/- offset for grouping clusters. Aplicable on all elements in the array. 
    """

    # determine all the steps
    steps = []
    for i in range(0, len(array) - 1):
        steps.append(abs(array[i] - array[i+1]))

    # determine the steps clusters
    clusters = []
    skip_indexes = []
    cluster_index = 0

    for i in range(len(steps)):
        if i in skip_indexes:
            continue

        # determine the cluster band (based on threshold)
        cluster_lower = steps[i] - (steps[i]/100) * threshold
        cluster_upper = steps[i] + (steps[i]/100) * threshold

        # create the new cluster
        clusters.append([])
        clusters[cluster_index].append(steps[i])

        # try to match elements from the rest of the array
        for j in range(i + 1, len(steps)):

            if not (cluster_lower <= steps[j] <= cluster_upper):
                continue

            clusters[cluster_index].append(steps[j])
            skip_indexes.append(j)

        cluster_index += 1  # increment the cluster id

    clusters = sorted(clusters, key=lambda x: len(x), reverse=True)
    biggest_cluster = clusters[0] if len(clusters) > 0 else None

    if biggest_cluster is None:
        return None

    return sum(biggest_cluster) / len(biggest_cluster)  # return our most common average


def detect_anomalous_values(array, regular_step, threshold=5):
    """
    Will scan every triad (3 points) in the array to detect anomalies.
    array: the array to iterate over.
    regular_step: the step around which we form the upper/lower band for filtering
    treshold: +/- variation between the steps of the first and median element and median and third element.
    """
    assert(len(array) >= 3)  # must have at least 3 elements

    anomalous_indexes = []

    step_lower = regular_step - (regular_step / 100) * threshold
    step_upper = regular_step + (regular_step / 100) * threshold

    # detection will be forward from i (hence 3 elements must be available for the d)
    for i in range(0, len(array) - 2):
        a = array[i]
        b = array[i+1]
        c = array[i+2]

        first_step = abs(a-b)
        second_step = abs(b-c)

        first_belonging = step_lower <= first_step <= step_upper
        second_belonging = step_lower <= second_step <= step_upper

        # detect that both steps are alright
        if first_belonging and second_belonging:
            continue  # all is good here, nothing to do

        # detect if the first point in the triad is bad
        if not first_belonging and second_belonging:
            anomalous_indexes.append(i)

        # detect the last point in the triad is bad
        if first_belonging and not second_belonging:
            anomalous_indexes.append(i+2)

        # detect the mid point in triad is bad (or everything is bad)
        if not first_belonging and not second_belonging:
            anomalous_indexes.append(i+1)
            # we won't add here the others because they will be detected by
            # the rest of the triad scans

    return sorted(set(anomalous_indexes))  # return unique indexes

if __name__ == "__main__":

    N = 10                  # Set signal sample length
    t1 = -np.pi             # Simulation begins at t1
    t2 =  np.pi;            # Simulation  ends  at t2

    in_array = np.linspace(t1, t2, N)

    # add some noise
    noise_input = random.uniform(-.5, .5);
    in_array[random.randint(0, len(in_array)-1)] = noise_input
    noisy_out_array = np.sin(in_array)

    # display noisy sin
    plt.figure()
    plt.plot(in_array, noisy_out_array, color = 'red', marker = "o");
    plt.title("noisy numpy.sin()")

    # detect anomalous values
    average_step = calculate_average_step(in_array)
    anomalous_indexes = detect_anomalous_values(in_array, average_step)

    # replace anomalous points with an estimated value based on our calculated average
    for anomalous in anomalous_indexes:

        # try forward extrapolation
        try:
            in_array[anomalous] = in_array[anomalous-1] + average_step
        # else try backwward extrapolation
        except IndexError:
            in_array[anomalous] = in_array[anomalous+1] - average_step

    # generate sine wave
    out_array = np.sin(in_array)

    plt.figure()
    plt.plot(in_array, out_array, color = 'green', marker = "o");
    plt.title("cleaned numpy.sin()")

    plt.show()

Синус с шумом:

noisy sine

Чистый синус:

cleaned sine

0 голосов
/ 22 декабря 2018

Ваша проблема связана с вектором времени (который имеет 1 измерение).Вам нужно будет применить какой-то фильтр к этому вектору.

Первое, что пришло в голову, это medfilt (медианный фильтр) из scipy, и это выглядит примерно так:

from scipy.signal import medfilt
l1 = [0, 10, 20, 30, 2, 50, 70, 15, 90, 100]
l2 = medfilt(l1)
print(l2)

результат этого будет:

[ 0. 10. 20. 20. 30. 50. 50. 70. 90. 90.]

проблема этого фильтра в том, что если мы применим некоторые значения шума к краям вектора, например [200, 0, 10, 20, 30, 2, 50, 70, 15, 90, 100, -50], то результат будет примерно таким[ 0. 10. 10. 20. 20. 30. 50. 50. 70. 90. 90. 0.] и, очевидно, это не совсем нормально для графика синуса, поскольку он будет производить те же артефакты для массива значений синуса.

Лучший подход к этой проблеме - рассматривать вектор времени как выход y.и это значения индекса как вход x и выполняют линейную регрессию для «линейной функции времени» , а не кавычек, это просто означает, что мы подделываем 2-мерную модельприменяя поддельный X вектор.Код подразумевает использование функции scipy linregress (линейная регрессия):

from scipy.stats import linregress
l1 = [5, 0, 10, 20, 30, -20, 50, 70, 15, 90, 100]
l1_x = range(0, len(l1))

slope, intercept, r_val, p_val, std_err = linregress(l1_x, l1)
l1 = intercept + slope * l1_x

print(l1)

, чей вывод будет:

[-10.45454545  -1.63636364   7.18181818  16.          24.81818182
  33.63636364  42.45454545  51.27272727  60.09090909  68.90909091
  77.72727273]

Теперь давайте применим это к вашему временивектор.

import random 
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt 
from scipy.stats import linregress

N = 20
# N = 10                  # Set signal sample length
t1 = -np.pi             # Simulation begins at t1
t2 =  np.pi;            # Simulation  ends  at t2

in_array = np.linspace(t1, t2, N)

# add some noise
noise_input = random.uniform(-.5, .5);
in_array[random.randint(0, len(in_array)-1)] = noise_input

# apply filter on time array
in_array_x = range(0, len(in_array))

slope, intercept, r_val, p_val, std_err = linregress(in_array_x, in_array)
in_array = intercept + slope * in_array_x

# generate sine wave
out_array = np.sin(in_array)
print("OUT ARRAY")
print(out_array)

plt.plot(in_array, out_array, color = 'red', marker = "o") ; plt.title("numpy.sin()")

plt.show()

выходной будет:

linear regression on time vector for sine

результирующий сигнал будет аппроксимацией оригинала, так какс любой формой экстраполяции / интерполяции / регрессионной фильтрации.

...